14 November 2015

Serialization

Most of the Transformation & some actions are Higher order functions, so they take another Function as an Argument

Ex : val filteredRDD = inputRDD.filter(x => x != 5)

Here the transformation filter() is taking another function(in this case Function Literal / Anonymous function) as an argument. As this function argument gets executed in different Executors, it is imperative, these function argument are serializable, so that they can be transferred to the Executors. 

If the function we pass as an argument is non-serializable, then NotSerializableException occurs

Member Variable & Methods


Passing a Member Variable (or) Method of an Object to a Transformation (or) Action results in a complete reference to the object, because of which Spark will transfer the Complete object to the Executors in which it(The Member variable or Method) will be referenced. This results in unnecessary transfer of data(ie.. in order to access one variable/method we end up transferring the entire object). In order to avoid this, always copy the Member variable/method into a local variable before passing it as an argument to the Transformation (or) Action

class Append(val suffix: String) {
  
  def appendSuffixBad(inputRDD: RDD[String]): RDD[String] = {
     // Bad      As we are referring the Member variable 'suffix'
     //          the whole object will be serialized & transferred
     //          to the Executor(so that 'suffix' can be referred)
     inputRDD.map { x => x + suffix }             
  }
  
  def appendSuffixGood(inputRDD: RDD[String]): RDD[String] = {
     // Good      Here we copied the Member variable 'suffix' into a 
     //           local variable, so only the local variable is transferred
     //           to the executor
     val localsuffix = suffix;
     inputRDD.map { x => x + localsuffix }             
  }

}

Reference


Learning Spark : Page 32