In this example, we will count the number of occurrence of each unique word in a given file
def main(args: Array[String]) {
A method in Scala is defined using the Keyword def. A function defined inside class/object is called as Method
val sc = new SparkContext(conf)
val units = words.map ( word => (word, 1) )
val counts = units.reduceByKey ( (x, y) => x + y )
Here units is of type RDD. As can be seen in the API document, the type RDD do not have the method reduceByKey. Why did not the Scala compiler throw any error? The reason is Implicit Conversion. When the compiler finds an unknown field or method in the current instance(units), it converts the current type(in our case RDD) into Another type which has this method(reduceByKey) defined. In our case the compiler converts the type RDD to type PairRDDFunctions (which has the method reduceByKey defined) so that reduceByKey can be called
In order for this Implicit conversion to work, we would need to import org.apache.spark.SparkContext._
Here is the input data we are going to test
mountain@mountain:~/sbook$ cat src/main/resources/data.txt
line1 word1 word2
line2 word3
line3 word1
Let us compile our code & package it into jar
mountain@mountain:~/sbook$ sbt package
...
[info] Packaging /home/mountain/workspace/allgitrepositories/ApacheSparkBook/target/scala-2.11/sparkbookapp_2.11-1.0.jar ...
That ends our first Scala Application in Spark
- object WordCount {
- def main(args: Array[String]) {
- println("In main : " + args(0) + "," + args(1))
- //Create Spark Context
- val conf = new SparkConf().setAppName("WordCountApp")
- val sc = new SparkContext(conf)
- //Load Data from File
- val input = sc.textFile(args(0))
- //Split into words
- val words = input.flatMap(line => line.split(" "))
- //Assign unit to each word
- val units = words.map ( word => (word, 1) )
- //Reduce each key
- val counts = units.reduceByKey ( (x, y) => x + y )
- //Write output to Disk
- counts.saveAsTextFile(args(1))
- //Shutdown spark. System.exit(0) or sys.exit() can also be used
- sc.stop();
- }
- }
'object'
object WordCount {
In Scala we use the keyword object to create a singleton. A class can be created using keyword class'val' & Type Inference
def main(args: Array[String]) {
val sc = new SparkContext(conf)
We are creating a Spark context to access Cluster. In Scala, The keyword val specifies Immutability, which actually means we cannot change the value of sc once defined. We can also create a value using the keyword var which specifies mutability. Using immutable value(val) is the preferred approach in Scala
A val or var is always associated with a type. If we do not specify the type explicitly(as in this case), Scala compiler will infer the type based on the data. This is called as Type Inference
val words = input.flatMap(line => line.split(" "))
A val or var is always associated with a type. If we do not specify the type explicitly(as in this case), Scala compiler will infer the type based on the data. This is called as Type Inference
Higher Order Functions & Function Literals
val words = input.flatMap(line => line.split(" "))
In Scala, an argument to the method can either be var, val (or) a function. A function that takes another function as argument is called Higher Order Functions. Here the method flatMap is a higher order function and we are passing an anonymous function(A function without name) as argument. In Scala, anonymous functions are called as Function Literals. In Java, anonymous functions are called as lambdas. Checkout this example to know more about flatMap
Implicit Conversion
val units = words.map ( word => (word, 1) )
val counts = units.reduceByKey ( (x, y) => x + y )
Here units is of type RDD. As can be seen in the API document, the type RDD do not have the method reduceByKey. Why did not the Scala compiler throw any error? The reason is Implicit Conversion. When the compiler finds an unknown field or method in the current instance(units), it converts the current type(in our case RDD) into Another type which has this method(reduceByKey) defined. In our case the compiler converts the type RDD to type PairRDDFunctions (which has the method reduceByKey defined) so that reduceByKey can be called
In order for this Implicit conversion to work, we would need to import org.apache.spark.SparkContext._
Here is the input data we are going to test
mountain@mountain:~/sbook$ cat src/main/resources/data.txt
line1 word1 word2
line2 word3
line3 word1
Let us compile our code & package it into jar
mountain@mountain:~/sbook$ sbt package
...
[info] Packaging /home/mountain/workspace/allgitrepositories/ApacheSparkBook/target/scala-2.11/sparkbookapp_2.11-1.0.jar ...
Submit the Application to Spark
spark-submit --class sbook.helloworld.WordCount target/scala-2.11/sparkbookapp_2.11-1.0.jar src/main/resources/data.txt output
Now we have the result available in directory output
mountain@mountain:~/sbook$ ls output
part-00000 part-00001 _SUCCESS
mountain@mountain:~/sbook$ cat output/*
(word2,1)
(line2,1)
(word3,1)
(line3,1)
(line1,1)
(word1,2)
That ends our first Scala Application in Spark