05 December 2015

Setting up Partition size

Example
scala> val inputrdd = sc.parallelize(Seq(
     |                ("key1", 1),
     |                ("key2", 2),
     |                ("key1", 3)))
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[80] at parallelize at :21

scala> val noPartition = inputrdd.reduceByKey((x, y) => x + y)
noPartition: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[81] at reduceByKey at :23

scala> noPartition.partitions.length
res50: Int = 8

scala> //Here Partition size is given as a second argument
scala> val withPartition = inputrdd.reduceByKey((x, y) => x + y, 11)
withPartition: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[82] at reduceByKey at :23

scala> withPartition.partitions.length
res51: Int = 11

scala> val repartitioned = withPartition.repartition(16)
repartitioned: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[86] at repartition at :25

scala> repartitioned.partitions.length
res52: Int = 16

scala> val coalesced = if(4 < repartitioned.partitions.length) {
     |    //Note : Use coalesce() only when the new partition size is
     |    //       less than the current partition size of the RDD
     |    repartitioned.coalesce(4)
     | }else {
     |    repartitioned
     | }
coalesced: org.apache.spark.rdd.RDD[(String, Int)] = CoalescedRDD[87] at coalesce at :30

scala> coalesced.partitions.length
res53: Int = 4

Reference


Learning Spark : Page 57