tag:blogger.com,1999:blog-76708535798448232232024-03-05T22:45:33.033-05:00Spark BasicsRajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comBlogger40125tag:blogger.com,1999:blog-7670853579844823223.post-42341605784478673012016-06-18T12:34:00.001-04:002016-06-18T12:37:42.008-04:00A Word Count Example with Cached Partition<div dir="ltr" style="text-align: left;" trbidi="on">
<pre class="prettyprint lang-scala linenums">
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel
scala> val lines = sc.textFile("hdfs:///user/raj/data.txt", 3)
lines: org.apache.spark.rdd.RDD[String] = hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28
scala> // No of partitions
scala> lines.partitions.size
res0: Int = 3
scala> // flatMap() : One of many transformation
scala> val words = lines.flatMap(x => x.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:30
scala> // Persist the data
scala> val units = words.map ( word => (word, 1) ).
| persist(StorageLevel.MEMORY_ONLY)
units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:32
scala>
scala> val counts = units.reduceByKey ( (x, y) => x + y )
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:34
// Text file is read to compute the 'counts' RDD
scala> counts.toDebugString
res1: String =
(3) ShuffledRDD[4] at reduceByKey at <console>:34 []
+-(3) MapPartitionsRDD[3] at map at <console>:32 []
| MapPartitionsRDD[2] at flatMap at <console>:30 []
| hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
| hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
scala> // First Action
scala> counts.collect()
res2: Array[(String, Int)] = Array((another,1), (This,2), (is,2), (a,1), (test,2))
scala> val counts2 = units.reduceByKey((x, y) => x * y)
counts2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:34
// Cache value is read to compute the 'counts2' RDD
scala> counts2.toDebugString
res3: String =
(3) ShuffledRDD[5] at reduceByKey at <console>:34 []
+-(3) MapPartitionsRDD[3] at map at <console>:32 []
| CachedPartitions: 3; MemorySize: 696.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| MapPartitionsRDD[2] at flatMap at <console>:30 []
| hdfs:///user/raj/data.txt MapPartitionsRDD[1] at textFile at <console>:28 []
| hdfs:///user/raj/data.txt HadoopRDD[0] at textFile at <console>:28 []
scala> // Second Action
scala> counts2.collect()
res4: Array[(String, Int)] = Array((another,1), (This,1), (is,1), (a,1), (test,1))
</pre>
</div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-66007096383048117002016-06-18T10:02:00.001-04:002016-06-18T10:05:04.082-04:00Broadcast Variable Example<div dir="ltr" style="text-align: left;" trbidi="on">
<pre class="prettyprint lang-scala linenums">
scala> // Sending a value from Driver to Worker Nodes without
scala> // using Broadcast variable
scala> val input = sc.parallelize(List(1, 2, 3))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
scala> val localVal = 2
localVal: Int = 2
scala> val added = input.map( x => x + localVal)
added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[18] at map at <console>:31
scala> added.foreach(println)
4
3
5
scala> //** Local variable is once again transferred to worked nodes
scala> // for the next operation
scala> val multiplied = input.map( x => x * 2)
multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:29
scala> multiplied.foreach(println)
4
6
2
</pre>
<pre class="prettyprint lang-scala linenums">
scala> // Sending a read-only value using Broadcast variable
scala> // Can be used to send large read-only values to all worker
scala> // nodes efficiently
scala> val broadcastVar = sc.broadcast(2)
broadcastVar: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(14)
scala> val added = input.map(x => broadcastVar.value + x)
added: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at map at <console>:31
scala> added.foreach(println)
5
3
4
scala> val multiplied = input.map(x => broadcastVar.value * x)
multiplied: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at map at <console>:31
scala> multiplied.foreach(println)
6
4
2
scala>
</pre>
</div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-61204284983140542382016-05-28T15:42:00.000-04:002016-06-18T12:21:06.366-04:00A Word Count example using 'spark-shell'<div dir="ltr" style="text-align: left;" trbidi="on">
<pre class="prettyprint lang-scala linenums">[raj@Rajkumars-MacBook-Pro ~]$spark-shell --master local[*]
2016-05-28 15:37:24.325 java[3907:6309927] Unable to load realm info from SCDynamicStore
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.
scala> val lines = sc.parallelize(List("This is a word", "This is another word"), 7)
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> // No of partitions
scala> lines.partitions.size
res0: Int = 7
scala> // flatMap() : One of many transformation
scala> val words = lines.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at <console>:29
scala> val units = words.map ( word => (word, 1) )
units: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at <console>:31
scala> val counts = units.reduceByKey ( (x, y) => x + y )
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:33
scala> counts.toDebugString
res1: String =
(7) ShuffledRDD[3] at reduceByKey at <console>:33 []
+-(7) MapPartitionsRDD[2] at map at <console>:31 []
| MapPartitionsRDD[1] at flatMap at <console>:29 []
| ParallelCollectionRDD[0] at parallelize at <console>:27 []
scala> // collect() : One of many actions
scala> counts.collect()
res2: Array[(String, Int)] = Array((This,2), (is,2), (another,1), (a,1), (word,2))
</pre>
<br /></div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-87570007688206281732016-05-04T22:17:00.002-04:002016-05-04T22:18:23.584-04:00Label 4...<div dir="ltr" style="text-align: left;" trbidi="on">
<br /></div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-64131417152651316822016-05-04T21:53:00.000-04:002016-05-08T18:18:38.675-04:00Accumulator : Example<div dir="ltr" style="text-align: left;" trbidi="on">
<div class="p1">
<b><u><span style="color: #cc0000;">Note</span></u></b> : Use <span style="font-family: "courier new" , "courier" , monospace;">Accumulator</span> only in <span style="font-family: "courier new" , "courier" , monospace;">action</span> to get correct values. Do not use <span style="font-family: "courier new" , "courier" , monospace;">Accumulator</span> in <span style="font-family: "courier new" , "courier" , monospace;">Transformation</span> ; Use it only <span class="s1">for</span> debugging purpose in <span style="font-family: Courier New, Courier, monospace;">Transformation</span></div>
<pre class="prettyprint lang-scala linenums">scala> val input = sc.parallelize(List(1, 2, 3, 4, 5,
| 6, 7, 8, 9, 10,
| 11, 12, 13, 14, 15
| ))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> println("No of partitions -> " + input.partitions.size)
No of partitions -> 8
scala> val myAccum = sc.accumulator(0, "My Accumulator")
myAccum: org.apache.spark.Accumulator[Int] = 0
scala> // Used inside an action
scala> input.foreach{ x =>
| //Thread.sleep(50000)
| myAccum += 1
| }
scala> println("myAccum -> " + myAccum.value)
myAccum -> 15
</pre>
<span style="color: #999999; font-family: "courier new" , "courier" , monospace;">c15 > a15</span></div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-30823324578384779092015-12-06T18:15:00.001-05:002016-05-28T13:18:46.580-04:00Setting Partitioner for RDD<div dir="ltr" style="text-align: left;" trbidi="on">
When doing Join on Pair RDDs, if one of the dataset we are using is a Master data, it makes a lot of sense to persist the data, as we do not want the RDD being created every time an action associated with the dataset is executed.<br />
<br />
Apart from enabling persistence for the master dataset, we can also avoid the shuffling of the master dataset by providing a Partitioner<br />
<br />
<b style="background-color: #cccccc;">Example</b><br />
<pre class="prettyprint lang-scala">scala> val masterdata =
| sc.
| parallelize(Seq(
| ("math", 55),
| ("math", 56),
| ("english", 57),
| ("english", 58),
| ("science", 59),
| ("science", 54))).
| partitionBy(new HashPartitioner(100)).
| persist()
masterdata: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[42] at partitionBy at <console>:31
scala> masterdata.partitioner
res16: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@64)
scala> masterdata.partitions.length
res17: Int = 100
</console></pre>
<br />
Following are the operations, in which shuffling can be avoided/minimized by using a Partitioner<br />
<span style="font-family: "courier new" , "courier" , monospace;">- cogroup()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- groupWith()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- join()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- leftOuterJoin()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- rightOuterJoin()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- groupByKey()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- reduceByKey()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- combineByKey()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- lookup()</span><br />
<br />
Operations that <u>preserves</u> the Partitioner of the Parent RDD (or) result in Partitioner being set<br />
<span style="font-family: "courier new" , "courier" , monospace;">- cogroup()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- groupWith()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- join()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- leftOuterJoin()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- rightOuterJoin()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- groupByKey()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- reduceByKey()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- combineByKey()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- partitionBy()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- sort()</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">- mapValues()</span><br />
<br />
Operations that will not set a Partitioner<br />
<span style="font-family: "courier new" , "courier" , monospace;">- map()</span><br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : Partitioning : 63, 66</div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-90878489501653351642015-12-06T18:04:00.001-05:002016-05-28T13:19:35.063-04:00parallelize() Examples<div dir="ltr" style="text-align: left;" trbidi="on">
<pre class="prettyprint lang-scala">scala> //for loop Example
scala> val rdd = sc.parallelize(for {
| x &lt- 1 to 3
| y &lt- 1 to 2
| } yield (x, None))
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[38] at parallelize at <console>:21
scala>
scala> rdd.collect()
res15: Array[(Int, None.type)] = Array((1,None), (1,None), (2,None), (2,None), (3,None), (3,None))
</console></pre>
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<span style="font-family: "courier new" , "courier" , monospace;">for loop</span> with <span style="font-family: "courier new" , "courier" , monospace;">yield</span> : <a href="http://alvinalexander.com/scala/scala-for-loop-yield-examples-yield-tutorial" target="_blank">http://alvinalexander.com/scala/scala-for-loop-yield-examples-yield-tutorial</a><br />
<br /></div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-7390816361979936002015-12-06T15:27:00.000-05:002016-05-28T13:21:05.809-04:00Actions on Pair RDDs : countByKey(), collectAsMap() & lookup()<div dir="ltr" style="text-align: left;" trbidi="on">
<div dir="ltr" style="text-align: left;" trbidi="on">
<b style="background-color: #cccccc;">Example</b><br />
<pre class="prettyprint lang-scala">scala> val rdd = sc.parallelize(Seq(
| ("math", 55),
| ("math", 56),
| ("english", 57),
| ("english", 58),
| ("science", 59),
| ("science", 54)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:21
scala> //Example : countByKey()
scala> val result1 = rdd.countByKey()
result1: scala.collection.Map[String,Long] = Map(math -> 2, english -> 2, science -> 2)
scala> //Example : collectAsMap()
scala> val reslt2 = rdd.collectAsMap()
reslt2: scala.collection.Map[String,Int] = Map(math -> 56, science -> 54, english -> 58)
scala> //Example : lookup()
scala> val result3 = rdd.lookup("math")
result3: Seq[Int] = WrappedArray(55, 56)
</console></pre>
</div>
</div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-15317606008289818542015-12-06T14:55:00.002-05:002016-05-28T13:21:24.774-04:00sortByKey() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>sortByKey()</b></span> is part of <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>OrderedRDDFunctions</b></span> that works on Key/Value pairs. The official documentation for <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions" target="_blank"><span style="font-family: "courier new" , "courier" , monospace;">OrderedRDDFunctions</span></a> states that,<br />
<pre class="prettyprint lang-scala">class OrderedRDDFunctions[K, V, P &lt: Product2[K, V]] extends Logging with Serializable
Extra functions available on RDDs of (key, value) pairs where the key is sortable through an implicit conversion. They will work with any key type K that has an implicit Ordering[K] in scope. Ordering objects already exist for all of the standard primitive types. Users can also define their own orderings for custom types, or to override the default ordering. The implicit ordering that is in the closest scope will be used.
</pre>
<b style="background-color: #cccccc;">Syntax : <span style="font-family: "courier new" , "courier" , monospace;">sortByKey()</span></b><br />
<pre class="prettyprint lang-scala">def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling collect or save on the resulting RDD will return or output an ordered list of records (in the save case, they will be written to multiple part-X files in the filesystem, in order of the keys).
</pre>
<b style="background-color: #cccccc;">Example</b><br />
<pre class="prettyprint lang-scala">scala> //Input
scala> val rdd = sc.parallelize(Seq(
| ("math", 55),
| ("math", 56),
| ("english", 57),
| ("english", 58),
| ("science", 59),
| ("science", 54)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:21
scala> rdd.collect()
res8: Array[(String, Int)] = Array((math,55), (math,56), (english,57), (english,58), (science,59), (science,54))
scala> //Default Sorting : Ascending order
scala> val sorted1 = rdd.sortByKey()
sorted1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at sortByKey at <console>:23
scala> //Result
scala> sorted1.collect()
res9: Array[(String, Int)] = Array((english,57), (english,58), (math,55), (math,56), (science,59), (science,54))
scala> //Custom Sorting : Descending order (using implicit 'Ordering')
scala> {
| //Let us define an implicit sorting for the method sortByKey()
| //We have used '{' above to limit the scope of the implicit ordering
| implicit val sortIntegersByString = new Ordering[String] {
| override def compare(a: String, b: String) = {
| val result = a.compare(b)
| //We use -ve to sort the key in descending order
| -result
| }
| }
| val sorted2 = rdd.sortByKey()
|
| //Result
| sorted2.collect()
| }
res10: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58))
scala> //Default Sorting : Descending order (done using the 'ascending' flag argument)
scala> val sorted3 = rdd.sortByKey(false)
sorted3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at sortByKey at <console>:23
scala> //Result
scala> sorted3.collect()
res11: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58))
</console></console></console></pre>
<br />
<b style="background-color: #cccccc;">Note</b> : <span style="font-family: "courier new" , "courier" , monospace;">sortByKey()</span> results in range-partitioned RDDs<br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : Range Partition : 64</div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-9786148619032958152015-12-06T13:00:00.000-05:002016-05-28T13:21:47.043-04:00join(), leftOuterJoin() & rightOuterJoin() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<b style="background-color: #cccccc;">Example</b><br />
<pre class="prettyprint lang-scala">
scala> //Input Data
scala> val rdd1 = sc.parallelize(Seq(
| ("math", 55),
| ("math", 56),
| ("english", 57),
| ("english", 58),
| ("science", 59),
| ("science", 54)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(Seq(
| ("math", 60),
| ("math", 65),
| ("science", 61),
| ("science", 62),
| ("history", 63),
| ("history", 64)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:21
scala> //join() Example
scala> val joined = rdd1.join(rdd2)
joined: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[49] at join at <console>:25
scala> //Result
scala> joined.collect()
res15: Array[(String, (Int, Int))] = Array((math,(55,60)), (math,(55,65)), (math,(56,60)), (math,(56,65)), (science,(59,61)), (science,(59,62)), (science,(54,61)), (science,(54,62)))
scala> //leftOuterJoin() Example
scala> val leftJoined = rdd1.leftOuterJoin(rdd2)
leftJoined: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[52] at leftOuterJoin at <console>:25
scala> //Result
scala> leftJoined.collect()
res16: Array[(String, (Int, Option[Int]))] = Array((math,(55,Some(60))), (math,(55,Some(65))), (math,(56,Some(60))), (math,(56,Some(65))), (english,(57,None)), (english,(58,None)), (science,(59,Some(61))), (science,(59,Some(62))), (science,(54,Some(61))), (science,(54,Some(62))))
scala> //rightOuterJoin() Example
scala> val rightJoined = rdd1.rightOuterJoin(rdd2)
rightJoined: org.apache.spark.rdd.RDD[(String, (Option[Int], Int))] = MapPartitionsRDD[55] at rightOuterJoin at <console>:25
scala> //Result
scala> rightJoined.collect()
res17: Array[(String, (Option[Int], Int))] = Array((math,(Some(55),60)), (math,(Some(55),65)), (math,(Some(56),60)), (math,(Some(56),65)), (history,(None,63)), (history,(None,64)), (science,(Some(59),61)), (science,(Some(59),62)), (science,(Some(54),61)), (science,(Some(54),62)))
</pre>
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-29377069160315509522015-12-06T12:36:00.003-05:002016-05-28T13:22:25.472-04:00cogroup() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<b style="background-color: #cccccc;">Example</b><br />
<br />
Multiple Pair RDDs can be combined using <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>cogroup()</b></span><br />
<br />
<pre class="prettyprint class=lang-scala">scala> val rdd1 = sc.parallelize(Seq(
| ("key1", 1),
| ("key2", 2),
| ("key1", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(Seq(
| ("key1", 5),
| ("key2", 4)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:21
//cogroup() Example
scala> val grouped = rdd1.cogroup(rdd2)
grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[28] at cogroup at <console>:25
//Result
scala> grouped.collect()
res10: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(CompactBuffer(1, 3),CompactBuffer(5))), (key2,(CompactBuffer(2),CompactBuffer(4))))
// Iterate through each value in key
// and increment the value by '1'
scala> val updated = grouped.map{x =>
| {
| val key = x._1
| //println("Key -> " + key)
| val value = x._2
| val itl1 = value._1
| val itl2 = value._2
| val res1 = itl1.map{ x =>
| {
| //println("It1 : Key -> " + key + ", Val -> " + (x + 1))
| x + 1
| }
| }
| val res2 = itl2.map{ x =>
| {
| //println("It2 : Key -> " + key + ", Val -> " + (x + 1))
| x + 1
| }
| }
| //println("End")
| (key, (res1, res2))
| }
| }
updated: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[18] at map at <console>:33
scala> updated.collect()
res17: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(List(2, 4),List(6))), (key2,(List(3),List(5))))
</pre>
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-63044518350922573292015-12-06T12:24:00.000-05:002016-05-28T13:22:51.067-04:00groupBy() & groupByKey() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<div dir="ltr" style="text-align: left;" trbidi="on">
<span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>groupByKey()</b></span> operates on Pair RDDs and is used to group all the values related to a given key. <b><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">groupBy()</span></b> can be used in both unpaired & paired RDDs. When used with unpaired data, the key for <span style="font-family: "courier new" , "courier" , monospace;">groupBy()</span> is decided by the function literal passed to the method<br />
<br />
<span style="background-color: #cccccc;"><b>Example</b></span><br />
<br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize(Seq(
| ("key1", 1),
| ("key2", 2),
| ("key1", 3)))
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:21
//groupByKey() Example
scala> val grouped1 = inputrdd.groupByKey()
grouped1: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:23
scala> grouped1.collect()
res6: Array[(String, Iterable[Int])] = Array((key1,CompactBuffer(1, 3)), (key2,CompactBuffer(2)))
//groupBy() Example : Find Odd & Even numbers
scala> val grouped2 = inputrdd.groupBy{ x =>
| if((x._2 % 2) == 0) {
| "evennumbers"
| }else {
| "oddnumbers"
| }
| }
grouped2: org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[15] at groupBy at <console>:23
scala> grouped2.collect()
res7: Array[(String, Iterable[(String, Int)])] = Array((evennumbers,CompactBuffer((key2,2))), (oddnumbers,CompactBuffer((key1,1), (key1,3))))
</console></console></console></pre>
<br />
<b style="background-color: #cccccc;">Note</b><br />
<br />
<span style="font-family: "courier new" , "courier" , monospace;">groupByKey()</span> always results in Hash-Partitioned RDDs<br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : Hash-Partition : 64<br />
<br />
<br /></div>
</div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-36122191519693969822015-12-05T20:39:00.002-05:002016-05-28T13:23:23.003-04:00Setting up Partition size<div dir="ltr" style="text-align: left;" trbidi="on">
<b style="background-color: #cccccc;">Example</b><br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize(Seq(
| ("key1", 1),
| ("key2", 2),
| ("key1", 3)))
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[80] at parallelize at <console>:21
scala> val noPartition = inputrdd.reduceByKey((x, y) => x + y)
noPartition: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[81] at reduceByKey at <console>:23
scala> noPartition.partitions.length
res50: Int = 8
scala> //Here Partition size is given as a second argument
scala> val withPartition = inputrdd.reduceByKey((x, y) => x + y, 11)
withPartition: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[82] at reduceByKey at <console>:23
scala> withPartition.partitions.length
res51: Int = 11
scala> val repartitioned = withPartition.repartition(16)
repartitioned: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[86] at repartition at <console>:25
scala> repartitioned.partitions.length
res52: Int = 16
scala> val coalesced = if(4 < repartitioned.partitions.length) {
| //Note : Use coalesce() only when the new partition size is
| // less than the current partition size of the RDD
| repartitioned.coalesce(4)
| }else {
| repartitioned
| }
coalesced: org.apache.spark.rdd.RDD[(String, Int)] = CoalescedRDD[87] at coalesce at <console>:30
scala> coalesced.partitions.length
res53: Int = 4</console></console></console></console></console></pre>
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : Page 57</div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-34644467346671457832015-12-05T18:35:00.000-05:002016-05-08T10:05:50.921-04:00Combiner in Pair RDDs : combineByKey()<div dir="ltr" style="text-align: left;" trbidi="on">
Similar to combiner in MapReduce, when working with key/value pairs, <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>combineByKey()</b></span> interface can be used to customize the combiner functionality. Methods like <span style="font-family: "courier new" , "courier" , monospace;">reduceByKey()</span> by default use their own combiner to combine the data locally in each Partition, for a given key<br />
<div dir="ltr" style="text-align: left;" trbidi="on">
<div>
<br /></div>
<div>
Similar to <span style="font-family: "courier new" , "courier" , monospace;">aggregate()</span>(which is used with single element RDD), <span style="font-family: "courier new" , "courier" , monospace;">combineByKey()</span> allows user to return different RDD element type compared to the element type of Input RDD</div>
<div>
<br />
<b style="background-color: #cccccc;">Syntax</b><br />
<pre class="prettyprint lang-scala">def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
- createCombiner, which turns a V into a C (e.g., creates a one-element list) - mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - mergeCombiners, to combine two C's into a single one.
In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).
</pre>
</div>
<div>
<br />
Here,<br />
<b><span style="font-family: "courier new" , "courier" , monospace;">1st Argument : </span><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">createCombiner</span></b> is called when a key(in the RDD element) is found for the first time in a given Partition. This method creates an initial value for the accumulator for that key<br />
<b><span style="font-family: "courier new" , "courier" , monospace;">2nd Argument : </span><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">mergeValue</span></b> is called when the key already has an accumulator<br />
<b><span style="font-family: "courier new" , "courier" , monospace;">3rd Argument : </span><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">mergeCombiners</span></b> is called when more that one partition has accumulator for the same key<br />
<br />
<b style="background-color: #cccccc;">Example</b><br />
<br />
Let us calculate the average in each subject using <span style="font-family: "courier new" , "courier" , monospace;">combineByKey()</span><br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize(Seq(
| ("maths", 50), ("maths", 60),
| ("english", 65),
| ("physics", 66), ("physics", 61), ("physics", 87)),
| 1)
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27
scala> inputrdd.getNumPartitions
res55: Int = 1
scala> val reduced = inputrdd.combineByKey(
| (mark) => {
| println(s"Create combiner -> ${mark}")
| (mark, 1)
| },
| (acc: (Int, Int), v) => {
| println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
| (acc._1 + v, acc._2 + 1)
| },
| (acc1: (Int, Int), acc2: (Int, Int)) => {
| println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
| (acc1._1 + acc2._1, acc1._2 + acc2._2)
| }
| )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29
scala> reduced.collect()
Create combiner -> 50
Merge value : (50 + 60, 1 + 1)
Create combiner -> 65
Create combiner -> 66
Merge value : (66 + 61, 1 + 1)
Merge value : (127 + 87, 2 + 1)
res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))
scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat)
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31
scala> result.collect()
res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))
</pre>
<br />
<b style="background-color: #cccccc;">Note</b><br />
<br />
The <u>map side aggregation</u> done using <span style="font-family: "courier new" , "courier" , monospace;">combineByKey()</span> can also be disabled(which is the case with methods like <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>groupByKey()</b></span> where the functionality of the combiner is not needed)<br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : Page 54</div>
</div>
</div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-52968143663811708672015-12-05T18:08:00.001-05:002016-05-28T13:23:53.436-04:00mapValues() Example<div dir="ltr" style="text-align: left;" trbidi="on">
When we use <span style="font-family: "courier new" , "courier" , monospace;">map()</span> with a <span style="font-family: "courier new" , "courier" , monospace;">Pair RDD</span>, we get access to both Key & value. There are times we might only be interested in accessing the value(& not key). In those case, we can use <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>mapValues()</b></span> instead of <span style="font-family: "courier new" , "courier" , monospace;">map()</span>.<br />
<div>
<br /></div>
<div>
In this example we use <span style="font-family: "courier new" , "courier" , monospace;">mapValues()</span> along with <span style="font-family: "courier new" , "courier" , monospace;">reduceByKey()</span> to calculate average for each subject</div>
<div>
<br /></div>
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize(Seq(("maths", 50), ("maths", 60), ("english", 65)))
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:21
scala> val mapped = inputrdd.mapValues(mark => (mark, 1));
mapped: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[30] at mapValues at <console>:23
scala> val reduced = mapped.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[31] at reduceByKey at <console>:25
scala> val average = reduced.map { x =>
| val temp = x._2
| val total = temp._1
| val count = temp._2
| (x._1, total / count)
| }
average: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[32] at map at <console>:27
scala>
| average.collect()
res30: Array[(String, Int)] = Array((english,65), (maths,55))
</console></console></console></console></pre>
<div>
<br />
<b style="background-color: #cccccc;">Note </b><br />
<br />
Operations like <span style="font-family: Courier New, Courier, monospace;">map()</span> always cause the new RDD to no retain the parent partitioning information<br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
Learning Spark : Partitioning : 64<br />
<br /></div>
</div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-2292102724488126472015-12-05T14:32:00.000-05:002016-05-28T13:24:14.011-04:00Pair RDD generation<div dir="ltr" style="text-align: left;" trbidi="on">
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Using <span style="font-family: "courier new" , "courier" , monospace;">parallelize()</span> method</span></u></b></h3>
<div>
<br />
By creating Tuples using the <span style="font-family: "courier new" , "courier" , monospace;">parallelize()</span> method, Pair RDD can be created</div>
<div>
<br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize(Seq(("key1", "val1"), ("key1", "val1")))
inputrdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[3] at parallelize at <console>:21
scala> inputrdd.reduceByKey((x, y) => x.concat(y)).collect()
res0: Array[(String, String)] = Array((key1,val1val1))
</console></pre>
</div>
<div>
<br />
<div>
Note :<br />
<span style="font-family: "courier new" , "courier" , monospace;">reduceByKey()</span> is available in class <span style="font-family: "courier new" , "courier" , monospace;">PairRDDFunctions</span></div>
</div>
Pair RDDs are simply RDDs of <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>Tuple2</b></span> object<br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Using <span style="font-family: "courier new" , "courier" , monospace;">map()</span> method</span></u></b></h3>
<br />
By using <span style="font-family: "courier new" , "courier" , monospace;">map()</span> method to create key/value pairs, Pair RDD can be created<br />
<br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize(List("key1 1", "key1 3"))
inputrdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:21
scala> val mapped = inputrdd.map { x =>
| val splitted = x.split(" ")
| (splitted(0), splitted(1).toInt)
| }
mapped: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at map at <console>:23
scala> val reduced = mapped.reduceByKey((x, y) => x + y)
reduced: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at reduceByKey at <console>:25
scala> reduced.collect()
res24: Array[(String, Int)] = Array((key1,4))
</console></console></console></pre>
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-61423404163228818112015-11-24T21:02:00.001-05:002016-05-28T13:25:05.626-04:00countByValue() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<b><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">countByValue()</span></b> is an action that returns the <b><span style="font-family: "courier new" , "courier" , monospace;">Map</span></b> of each unique value with its count<br />
<br />
<b style="background-color: #999999;">Syntax</b>
<pre class="prettyprint class=lang-scala">def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
Return the count of each unique value in this RDD as a local map of (value, count) pairs.
Note that this method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.
</pre>
<br />
<b style="background-color: #999999;">Example</b><br />
<pre class="prettyprint class=lang-scala">scala> val inputrdd = sc.parallelize{ Seq(10, 4, 3, 3) }
inputrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:47
scala> inputrdd.countByValue()
res34: scala.collection.Map[Int,Long] = Map(10 -> 1, 3 -> 2, 4 -> 1)
</console></pre>
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : 41<br />
<a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD" target="_blank">http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD</a><br />
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-68656511089093393652015-11-24T20:52:00.001-05:002016-05-28T13:34:52.346-04:00takeSample() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<span style="color: #c27ba0; font-family: Courier New, Courier, monospace;"><b>takeSample()</b></span> is an action that is used to return a fixed-size sample subset of an RDD<br />
<br />
<b style="background-color: #999999;">Syntax</b><br />
<pre class="prettyprint lang-scala">def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
Return a fixed-size sampled subset of this RDD in an array
withReplacement whether sampling is done with replacement
num size of the returned sample
seed seed for the random number generator
returns sample of specified size in an array
</pre>
<br />
<b style="background-color: #999999;">Example</b><br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize{ Seq(10, 4, 5, 3, 11, 2, 6) }
inputrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:47
scala> inputrdd.takeSample(false, 3, System.nanoTime.toInt)
res29: Array[Int] = Array(6, 11, 10)
scala> inputrdd.takeSample(false, 3, System.nanoTime.toInt)
res30: Array[Int] = Array(5, 11, 4)
scala> inputrdd.takeSample(true, 3, System.nanoTime.toInt)
res31: Array[Int] = Array(10, 11, 5)
</console></pre>
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : 41<br />
<a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD" target="_blank">http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD</a></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-70830421459388695842015-11-24T20:31:00.000-05:002016-05-28T13:35:43.860-04:00top() & takeOrdered() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>top()</b></span> & <b><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">takeOrdered()</span></b> are actions that return the N elements based on the default ordering or the Customer ordering provided by us<br />
<br />
<b style="background-color: #cccccc;">Syntax</b><br />
<pre class="prettyprint lang-scala">def top(num: Int)(implicit ord: Ordering[T]): Array[T]
Returns the top k (largest) elements from this RDD as defined by the specified implicit Ordering[T]. This does the opposite of takeOrdered. For example:
sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
// returns Array(12)
sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
// returns Array(6, 5)
num k, the number of top elements to return
ord the implicit ordering for T
returns an array of top elements
</pre>
<br />
<b style="background-color: #999999;">Example</b><br />
In this example, let us return the top 5 elements based on ascending order<br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize{ Seq(10, 4, 5, 3, 11, 2, 6) }
inputrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:44
scala>
scala> implicit val sortIntegersByString = new Ordering[Int] {
| override def compare(a: Int, b: Int) = {
| //a.toString.compare(b.toString)
| if(a > b) {
| -1
| }else{
| +1
| }
| }
| }
sortIntegersByString: Ordering[Int] = $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anon$1@4be552af
scala> inputrdd.top(5)
res28: Array[Int] = Array(2, 3, 4, 5, 6)
</console></pre>
<br />
<span style="font-family: "courier new" , "courier" , monospace;">takeOrdered()</span> does the opposite of <span style="font-family: "courier new" , "courier" , monospace;">top()</span><br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : 60<br />
<a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD" target="_blank">http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD</a><br />
<br />
<br />
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-3502158891579435062015-11-24T19:36:00.000-05:002016-05-28T13:36:28.646-04:00foreach() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>foreach()</b></span> is an action. Unlike other actions, foreach do not return any value. It simply operates on all the elements in the RDD. <span style="font-family: "courier new" , "courier" , monospace;">foreach()</span> can be used in situations, where we do not want to return any result, but want to initiate a computation. A good example is ; inserting elements in RDD into database. Let us look at an example for <span style="font-family: "courier new" , "courier" , monospace;">foreach()</span><br />
<br />
<pre class="prettyprint lang-scala">scala> val testData=Array(1,2,3)
testData: Array[Int] = Array(1, 2, 3)
scala> val inputrdd = sc.parallelize(testData)
inputrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala>
scala> inputrdd.foreach{ x => {
| println(x)
| }}
3
1
2
scala>
scala> //Checkout this ; out addition has no
scala> //impact on the actual elements in RDD
scala> inputrdd.collect()
res6: Array[Int] = Array(1, 2, 3)</console></pre>
<br />
<h3 style="text-align: left;">
<u><span style="color: red;">Reference</span></u></h3>
Learning spark : Page 41</div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-9983761443838836342015-11-19T20:50:00.001-05:002016-05-28T13:36:47.619-04:00mapPartitions() Example<div dir="ltr" style="text-align: left;" trbidi="on">
<span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>mapPartitions()</b></span> can be used as an alternative to <span style="font-family: "courier new" , "courier" , monospace;">map()</span> & <span style="font-family: "courier new" , "courier" , monospace;">foreach()</span>. <span style="font-family: "courier new" , "courier" , monospace;">mapPartitions()</span> is called once for each Partition unlike <span style="font-family: "courier new" , "courier" , monospace;">map()</span> & <span style="font-family: "courier new" , "courier" , monospace;">foreach()</span> which is called for each element in the RDD. The main advantage being that, we can do initialization on Per-Partition basis instead of per-element basis(as done by <span style="font-family: "courier new" , "courier" , monospace;">map()</span> & <span style="font-family: "courier new" , "courier" , monospace;">foreach()</span>)<br />
<div>
<br /></div>
<div>
Consider the case of Initializing a database. If we are using <span style="font-family: "courier new" , "courier" , monospace;">map()</span> or <span style="font-family: "courier new" , "courier" , monospace;">foreach()</span>, the number of times we would need to initialize will be equal to the no of elements in RDD. Whereas if we use <span style="font-family: "courier new" , "courier" , monospace;">mapPartitions()</span>, the no of times we would need to initialize would be equal to number of Partitions</div>
<div>
<br /></div>
<div>
We get Iterator as an argument for mapPartition, through which we can iterate through all the elements in a Partition. </div>
<div>
<br /></div>
<div>
In this example, we will use <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>mapPartitionsWithIndex()</b></span>, which apart from similar to <span style="font-family: "courier new" , "courier" , monospace;">mapPartitions()</span> also provides an index to track the Partition No<br />
<br />
<span style="background-color: #999999;">Syntax</span></div>
<pre class="prettyprint lang-scala">def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.
</pre>
<div>
<br />
<span style="background-color: #999999;">Example</span></div>
In this example, we add partition no to each element of an RDD<br />
<pre class="prettyprint lang-scala">scala> val rdd1 = sc.parallelize(
| List(
| "yellow", "red",
| "blue", "cyan",
| "black"
| ),
| 3
| )
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:21
scala>
scala> val mapped = rdd1.mapPartitionsWithIndex{
| // 'index' represents the Partition No
| // 'iterator' to iterate through all elements
| // in the partition
| (index, iterator) => {
| println("Called in Partition -> " + index)
| val myList = iterator.toList
| // In a normal user case, we will do the
| // the initialization(ex : initializing database)
| // before iterating through each element
| myList.map(x => x + " -> " + index).iterator
| }
| }
mapped: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at mapPartitionsWithIndex at <console>:23
scala>
| mapped.collect()
Called in Partition -> 1
Called in Partition -> 2
Called in Partition -> 0
res7: Array[String] = Array(yellow -> 0, red -> 1, blue -> 1, cyan -> 2, black -> 2)
</console></console></pre>
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-32076942040510687782015-11-18T15:34:00.000-05:002016-05-28T13:37:12.323-04:00aggregate() Example<div dir="ltr" style="text-align: left;" trbidi="on">
Compared to <span style="font-family: "courier new" , "courier" , monospace;">reduce()</span> & <span style="font-family: "courier new" , "courier" , monospace;">fold()</span>, the <span style="font-family: "courier new" , "courier" , monospace;">aggregate()</span> function has the advantage, it can return different Type vis-a-vis the RDD Element Type(ie Input Element type)<br />
<br />
<span style="background-color: #999999;">Syntax</span><br />
<pre class="prettyprint lang-scala">def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
</pre>
<br />
In this example, the RDD element type is <span style="font-family: "courier new" , "courier" , monospace;">(String, Int)</span> whereas the return type is <span style="font-family: "courier new" , "courier" , monospace;">Int</span><br />
<br />
<span style="background-color: #999999;">Example</span><br />
<pre class="prettyprint lang-scala">scala> val inputrdd = sc.parallelize(
| List(
| ("maths", 21),
| ("english", 22),
| ("science", 31)
| ),
| 3
| )
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:21
scala> inputrdd.partitions.size
res18: Int = 3
scala> val result = inputrdd.aggregate(3) (
| /*
| * This is a seqOp for merging T into a U
| * ie (String, Int) in into Int
| * (we take (String, Int) in 'value' & return Int)
| * Arguments :
| * acc : Reprsents the accumulated result
| * value : Represents the element in 'inputrdd'
| * In our case this of type (String, Int)
| * Return value
| * We are returning an Int
| */
| (acc, value) => (acc + value._2),
|
| /*
| * This is a combOp for mergining two U's
| * (ie 2 Int)
| */
| (acc1, acc2) => (acc1 + acc2)
| )
result: Int = 86
</console></pre>
<br />
The result is calculated as follows,<br />
<br />
<b>Partition 1</b> : <span style="font-family: "courier new" , "courier" , monospace;">Sum(all Elements) + 3 (Zero value)</span><br />
<b>Partition 2</b> : <span style="font-family: "courier new" , "courier" , monospace;">Sum(all Elements) + 3 (Zero value)</span><br />
<b>Partition 3</b> : <span style="font-family: "courier new" , "courier" , monospace;">Sum(all Elements) + 3 (Zero value)</span><br />
<b>Result</b> = <span style="font-family: "courier new" , "courier" , monospace;">Partition1 + Partition2 + Partition3 + 3(Zero value)</span><br />
<br />
So we get <span style="font-family: "courier new" , "courier" , monospace;">21 + 22 + 31 + (4 * 3) = 86</span><br />
<br />
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000;">Reference</span></u></b></h3>
<br />
Learning Spark : Page 40<br />
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-8041905059580449652015-11-17T13:26:00.001-05:002016-05-28T13:37:35.584-04:00reduce() & fold() Examples<div dir="ltr" style="text-align: left;" trbidi="on">
<h3 style="text-align: left;">
<b><u><span style="color: #cc0000; font-size: x-large;">Sum</span></u></b></h3>
<br />
<div class="p1">
<b><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">reduce(</span><span style="color: #999999; font-family: "courier new" , "courier" , monospace;"><function type></span><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">)</span></b> takes a Function Type ; which takes 2 elements of RDD Element Type as argument & returns the Element of same type</div>
<div class="p1">
<br />
<b style="background-color: #cccccc;">Syntax</b></div>
<pre class="prettyprint lang-scala">def reduce(f: (T, T) ⇒ T): T
Reduces the elements of this RDD using the specified associative binary operator.
</pre>
<br />
<pre class="prettyprint lang-scala">scala> val rdd1 = sc.parallelize(List(1, 2, 5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21
scala> val sum = rdd1.reduce{ (x, y) => x + y}
sum: Int = 8</console></pre>
<br />
<b><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">fold()</span></b> is similar to reduce except that it takes an '<b><span style="font-family: "courier new" , "courier" , monospace;">Zero value</span></b>'(Think of it as a kind of initial value) which will be used in the initial call on each Partition<br />
<br />
<b style="background-color: #cccccc;">Syntax</b><br />
<pre class="prettyprint lang-scala">def fold(zeroValue: T)(op: (T, T) ⇒ T): T
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
</pre>
<span style="font-family: "courier new" , "courier" , monospace;"><span style="background-color: yellow;">(</span><span style="background-color: yellow;">??</span> : Modifying t1 is not working..although this example works)</span><br />
<span style="font-family: "courier new" , "courier" , monospace;">(<span style="background-color: yellow;">??</span> : What does 'op:' denotes. In some functions(ex : reduce) the function type is mentioned as 'f:')</span><br />
<br />
<pre class="prettyprint lang-scala">scala> val rdd1 = sc.parallelize(List(
| ("maths", 80),
| ("science", 90)
| ))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:21
scala> rdd1.partitions.length
res8: Int = 8
scala> val additionalMarks = ("extra", 4)
additionalMarks: (String, Int) = (extra,4)
scala> val sum = rdd1.fold(additionalMarks){ (acc, marks) =>
| val sum = acc._2 + marks._2
| ("total", sum)
| }
sum: (String, Int) = (total,206)
</console></pre>
<br />
Note here the Partition length is <span style="font-family: "courier new" , "courier" , monospace;">8</span>. So in each partition, after the RDD elements are added, the '<span style="font-family: "courier new" , "courier" , monospace;">Zero value of 4</span>' is also added(ie <span style="font-family: "courier new" , "courier" , monospace;">8 * 4 = 32</span>). When the result of each partition is added, once again a value of <span style="font-family: "courier new" , "courier" , monospace;">4</span> is added to the result. Therefore we have <span style="font-family: "courier new" , "courier" , monospace;">((8 * 4) + 4)</span>, totally <span style="font-family: "courier new" , "courier" , monospace;"><b>36</b></span>.<br />
<br />
Therefore <span style="font-family: "courier new" , "courier" , monospace;">result = 80 + 90 + 36 = 206</span><br />
<br />
<b style="background-color: #cccccc;">Disadvantage</b> : This disadvantage of both <span style="font-family: "courier new" , "courier" , monospace;">reduce()</span> & <span style="font-family: "courier new" , "courier" , monospace;">fold()</span> is, the return type should be the same as the RDD element type. <b><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">aggregate()</span></b> function can be used to avoid this limitation.<br />
<br />
<b style="background-color: #cccccc;">Note</b> : <span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;"><b>foldByKey()</b></span> is very similar to <span style="font-family: "courier new" , "courier" , monospace;">fold()</span> except that it operates on a Pair RDD<br />
<br />
<h3 style="text-align: left;">
<span style="color: #cc0000; font-size: x-large;"><u>Reference</u></span></h3>
<br />
Learning Spark : Page 38<br />
<a href="http://blog.madhukaraphatak.com/spark-rdd-fold/" target="_blank"><span style="font-family: "courier new" , "courier" , monospace;">http://blog.madhukaraphatak.com/spark-rdd-fold/</span></a><br />
<br /></div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-72134520579379730122015-11-14T17:50:00.000-05:002016-05-28T13:37:57.045-04:00distinct(), union() & more...<div dir="ltr" style="text-align: left;" trbidi="on">
<div dir="ltr" style="text-align: left;" trbidi="on">
In this post, will look at the following Pseudo set Transformations<br />
<ol style="text-align: left;">
<li><span style="font-family: "courier new" , "courier" , monospace;">distinct()</span></li>
<li><span style="font-family: "courier new" , "courier" , monospace;">union()</span></li>
<li><span style="font-family: "courier new" , "courier" , monospace;">intersection()</span></li>
<li><span style="font-family: "courier new" , "courier" , monospace;">subtract()</span></li>
<li><span style="font-family: "courier new" , "courier" , monospace;">cartesian()</span></li>
</ol>
<pre class="prettyprint lang-scala">scala> //Reference : Learning Spark (Page 38)
scala> val rdd1 = sc.parallelize(List("lion", "tiger", "tiger", "peacock", "horse"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(List("lion", "tiger"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at parallelize at <console>:21
scala> // distinct(): Returns distinct element in the RDD
scala> // Warning :Involves shuffling of data over N/W
scala> rdd1.distinct().collect()
res20: Array[String] = Array(peacock, lion, horse, tiger)
scala> // union() : Returns an RDD containing data from both sources
scala> // Note : Unlike the Mathematical Union, duplicates are
scala> // not removed. Also type should be same in both the RDD
scala> rdd1.union(rdd2).collect()
res22: Array[String] = Array(lion, tiger, tiger, peacock, horse, lion, tiger)
scala> // intersection() : Returns elements that are common b/w both
scala> // RDDs. Also removed Duplicates
scala> // Warning : Involves shuffling & has worst performance
scala> rdd1.intersection(rdd2).collect();
res24: Array[String] = Array(lion, tiger)
scala> // subtract() : Returns only elements that are present in the
scala> // first RDD
scala> rdd1.subtract(rdd2).collect()
res26: Array[String] = Array(peacock, horse)
scala> // cartesian(): Provides cartesian product b/w 2 RDDs
scala> // Warning : Is very expensive for large RDDs
scala> rdd1.cartesian(rdd2).collect();
res28: Array[(String, String)] = Array((lion,lion), (lion,tiger), (tiger,lion), (tiger,tiger), (tiger,lion), (tiger,tiger), (peacock,lion), (peacock,tiger), (horse,lion), (horse,tiger))
</console></console></pre>
<br /></div>
</div>
Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.comtag:blogger.com,1999:blog-7670853579844823223.post-73902460359792988552015-11-14T16:35:00.000-05:002016-05-28T13:38:19.796-04:00Serialization<div dir="ltr" style="text-align: left;" trbidi="on">
Most of the Transformation & some actions are Higher order functions, so they take another Function as an Argument<br />
<div>
<br /></div>
<div>
<span style="font-family: "courier new" , "courier" , monospace;">Ex : <span style="color: #6aa84f;"><span class="s1">val</span><span class="s2"> </span>filteredRDD<span class="s2"> = </span>inputRDD<span class="s2">.</span><span class="s3"><b>filter</b></span><span class="s2">(</span><b><span class="s4">x</span><span class="s2"> => </span><span class="s4">x</span><span class="s2"> </span><span class="s3">!=</span><span class="s2"> </span><span class="s5">5</span></b><span class="s2">)</span></span></span></div>
<div>
<span class="s2"><br /></span></div>
<div>
<span class="s2">Here the transformation <span style="font-family: "courier new" , "courier" , monospace;">filter()</span> is taking another function(in this case Function Literal / Anonymous function) as an argument. As this function argument gets executed in different Executors, it is imperative, these function argument are serializable, so that they can be transferred to the Executors. </span></div>
<div>
<span class="s2"><br /></span></div>
<div>
<span class="s2">If the function we pass as an argument is non-serializable, then <b><span style="color: #c27ba0; font-family: "courier new" , "courier" , monospace;">NotSerializableException</span></b> occurs</span></div>
<div>
<span class="s2"><br /></span></div>
<h3 style="text-align: left;">
<span class="s2"><b><span style="color: #cc0000;"><u>Member Variable & Methods</u></span></b></span></h3>
<div>
<span class="s2"><br /></span>
<span class="s2">Passing a <u>Member Variable</u> (or) <u>Method</u> of an Object to a Transformation (or) Action results in a complete reference to the object, because of which Spark will transfer the <b><u>Complete object</u></b> to the Executors in which it(The Member variable or Method) will be referenced. This results in unnecessary transfer of data(ie.. in order to access one variable/method we end up transferring the entire object). In order to avoid this, <b><u><span style="color: #c27ba0;">always copy the Member variable/method into a local variable</span></u></b> before passing it as an argument to the Transformation (or) Action</span><br />
<br /></div>
<div>
<pre class="prettyprint lang-scala linenums">class Append(val suffix: String) {
def appendSuffixBad(inputRDD: RDD[String]): RDD[String] = {
// Bad As we are referring the Member variable 'suffix'
// the whole object will be serialized & transferred
// to the Executor(so that 'suffix' can be referred)
inputRDD.map { x => x + suffix }
}
def appendSuffixGood(inputRDD: RDD[String]): RDD[String] = {
// Good Here we copied the Member variable 'suffix' into a
// local variable, so only the local variable is transferred
// to the executor
val localsuffix = suffix;
inputRDD.map { x => x + localsuffix }
}
}</pre>
</div>
<div>
<span class="s2"><br /></span>
<h3 style="text-align: left;">
<span class="s2"><b><span style="color: #cc0000;"><u>Reference</u></span></b></span></h3>
</div>
<div>
<br /></div>
<div>
Learning Spark : Page 32</div>
<div>
<br /></div>
</div>Rajhttp://www.blogger.com/profile/14196501504489758397noreply@blogger.com