14 November 2015

A Word Count Application

In this example, we will count the number of occurrence of each unique word in a given file
object WordCount {
   def main(args: Array[String]) {

      println("In main : " + args(0) + "," + args(1))

      //Create Spark Context
      val conf = new SparkConf().setAppName("WordCountApp")
      val sc = new SparkContext(conf)

      //Load Data from File
      val input = sc.textFile(args(0))

      //Split into words
      val words = input.flatMap(line => line.split(" "))

      //Assign unit to each word
      val units = words.map ( word => (word, 1) )

      //Reduce each key
      val counts = units.reduceByKey ( (x, y) => x + y )

      //Write output to Disk
      counts.saveAsTextFile(args(1))

      //Shutdown spark. System.exit(0) or sys.exit() can also be used
      sc.stop();
   }
}

'object'

object WordCount {
In Scala we use the keyword object to create a singleton. A class can be created using keyword class

'val' & Type Inference


def main(args: Array[String]) { 
A method in Scala is defined using the Keyword def.  A function defined inside class/object is called as Method

val sc = new SparkContext(conf)
We are creating a Spark context to access Cluster. In Scala,  The keyword val specifies Immutability, which actually means we cannot change the value of sc once defined. We can also create a value using the keyword var which specifies mutability. Using immutable value(val) is the preferred approach in Scala

val or var is always associated with a type. If we do not specify the type explicitly(as in this case), Scala compiler will infer the type based on the data. This is called as Type Inference

Higher Order Functions & Function Literals


val words = input.flatMap(line => line.split(" "))
In Scala, an argument to the method can either be varval (or) a function. A function that takes another function as argument is called Higher Order Functions. Here the method flatMap is a higher order function and we are passing an anonymous function(A function without name) as argument. In Scala, anonymous functions are called as Function Literals. In Java, anonymous functions are called as lambdas. Checkout this example to know more about flatMap

Implicit Conversion


val units = words.map ( word => (word, 1) )
val counts = units.reduceByKey ( (x, y) => x + y )
Here units is of type RDD. As can be seen in the API document, the type RDD do not have the method reduceByKey. Why did not the Scala compiler throw any error? The reason is Implicit Conversion. When the compiler finds an unknown field or method in the current instance(units), it converts the current type(in our case RDD) into Another type which has this method(reduceByKey) defined. In our case the compiler converts the type RDD to type PairRDDFunctions (which has the method reduceByKey defined) so that reduceByKey can be called

In order for this Implicit conversion to work, we would need to import  org.apache.spark.SparkContext._

Here is the input data we are going to test

mountain@mountain:~/sbook$ cat src/main/resources/data.txt 
line1 word1 word2
line2 word3
line3 word1

Let us compile our code & package it into jar

mountain@mountain:~/sbook$ sbt package
...
[info] Packaging /home/mountain/workspace/allgitrepositories/ApacheSparkBook/target/scala-2.11/sparkbookapp_2.11-1.0.jar ...

Submit the Application to Spark

spark-submit --class sbook.helloworld.WordCount target/scala-2.11/sparkbookapp_2.11-1.0.jar src/main/resources/data.txt  output

Now we have the result available in directory output

mountain@mountain:~/sbook$ ls output
part-00000  part-00001  _SUCCESS

mountain@mountain:~/sbook$ cat output/*
(word2,1)
(line2,1)
(word3,1)
(line3,1)
(line1,1)
(word1,2)

That ends our first Scala Application in Spark