06 December 2015

sortByKey() Example

sortByKey() is part of OrderedRDDFunctions that works on Key/Value pairs. The official documentation for OrderedRDDFunctions states that,
class OrderedRDDFunctions[K, V, P &lt: Product2[K, V]] extends Logging with Serializable
Extra functions available on RDDs of (key, value) pairs where the key is sortable through an implicit conversion. They will work with any key type K that has an implicit Ordering[K] in scope. Ordering objects already exist for all of the standard primitive types. Users can also define their own orderings for custom types, or to override the default ordering. The implicit ordering that is in the closest scope will be used.
Syntax : sortByKey()
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling collect or save on the resulting RDD will return or output an ordered list of records (in the save case, they will be written to multiple part-X files in the filesystem, in order of the keys).
Example
scala> //Input
scala> val rdd = sc.parallelize(Seq(
     |                ("math",    55),
     |                ("math",    56),
     |                ("english", 57),
     |                ("english", 58),
     |                ("science", 59),
     |                ("science", 54)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at :21

scala> rdd.collect()
res8: Array[(String, Int)] = Array((math,55), (math,56), (english,57), (english,58), (science,59), (science,54))

scala> //Default Sorting : Ascending order
scala> val sorted1 = rdd.sortByKey()
sorted1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at sortByKey at :23

scala> //Result
scala> sorted1.collect()
res9: Array[(String, Int)] = Array((english,57), (english,58), (math,55), (math,56), (science,59), (science,54))

scala> //Custom Sorting : Descending order (using implicit 'Ordering')
scala> {
     |    //Let us define an implicit sorting for the method sortByKey()
     |    //We have used '{' above to limit the scope of the implicit ordering
     |    implicit val sortIntegersByString = new Ordering[String] {
     |       override def compare(a: String, b: String) = {
     |          val result = a.compare(b)
     |          //We use -ve to sort the key in descending order
     |          -result
     |       }
     |    }
     |    val sorted2 = rdd.sortByKey()
     |
     |    //Result
     |    sorted2.collect()
     | }
res10: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58))

scala> //Default Sorting : Descending order (done using the 'ascending' flag argument)
scala> val sorted3 = rdd.sortByKey(false)
sorted3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at sortByKey at :23

scala> //Result
scala> sorted3.collect()
res11: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58))

Note : sortByKey() results in range-partitioned RDDs

Reference


Learning Spark : Range Partition : 64