31 May 2015

RDD Persistence

RDDs are Recomputed


RDDs by default is recomputed each time an action is run on them. For example,

scala> val lines = sc.textFile("words.txt")
...
scala> lines.first()
res4: String = line1 word1
scala> lines.count()
res5: Long = 4

Here the call to action first() computes the RDD 'lines'. Again when we use another action 'count()' on the same RDD, the RDD is recomputed once again

Persisting RDDs


The default behavior of recomputing the RDDs on each action can be overridden by persisting the RDDs, so that no re-computation is done each time an action is called on the RDD. When persisted, each node that compute the RDD store the result in their Partitions

We use persist() method to persist an RDD. In Scala & Java, by default, persist() will store the data in JVM as unserialized object. In Python, calling persist() will serialize the data before persisting. Options to store in Memory/Disk combination is also possible.

scala> val lines = sc.textFile("words.txt")
...
scala> import org.apache.spark.storage.StorageLevel
...
scala> lines.persist(StorageLevel.MEMORY_ONLY) //We can also use cache() method if we need MEMORY_ONLY storage level
...
scala> lines.count() (1)
...

The actual persistence takes place during the first (1) action call on the RDD. Spark provides multiple Storage options(Memory/Disk) to persist the data as well as Replication Levels. More information can be found here

We use unpersist() to unpersist RDD. When the cached data exceeds the Memory capacity, Spark automatically evicts the old partitions(it will be recalculated when needed). This is called Last Recently used Cache(LRU) policy