Performance Benefit of Using Spark DataSet APIs (1.6.0) – An example with Spark WordCount Program

What is Spark DataSet?

As per the explanation from Apache Spark Documentation, Spark Datasets are an extension of the DataFrame API that provides a type-safe, object-oriented programming interface.  Spark 1.6 includes an API preview of Datasets, and they will be a development focus for the next several versions of Spark.

Like DataFrames, Datasets take advantage of Spark’s Catalyst optimizer by exposing expressions and data fields to a query planner.
Datasets also leverage Tungsten’s fast in-memory encoding. Datasets extend these benefits with compile-time type safety – meaning production applications  can be checked for errors before they are run. They also allow direct operations over user-defined classes. Datasets are similar to RDDs, however, instead of using Java Serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network.

The core of the Dataset API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. The tabular representation is stored using Spark’s internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization.

You need to have Spark 1.6 Version to use this APIs and feature. This support all related RDD APIs as mentioned in the API documentation.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

 

Below is build.sbt file I had in my IntelliJ configuration.
name := “SparkLatest”

version := “1.0”

scalaVersion := “2.10.4”

libraryDependencies ++= Seq (
“org.apache.spark” % “spark-core_2.10” % “1.6.0”,
“org.apache.spark” % “spark-sql_2.10” % “1.6.0”,
//”org.apache.hadoop” % “hadoop-common” % “2.6.0”,
//”org.apache.hadoop” % “hadoop-client” % “2.7.1”,
“org.apache.spark” % “spark-sql_2.10” % “1.6.0”,
“org.apache.spark” % “spark-hive_2.10” % “1.6.0”,
“org.apache.spark” % “spark-yarn_2.10” % “1.6.0”,
“com.databricks” % “spark-xml_2.10” % “0.2.0”
//”org.apache.hive” % “hive-exec” % “1.2.1”
//”org.apache.spark” % “spark-streaming_2.10” % “1.6.0”,
//”org.apache.spark” % “spark-streaming-kafka_2.10” % “1.6.0”,
//”com.fasterxml.jackson.core” % “jackson-core” % “2.6.1”,
//”org.elasticsearch” % “elasticsearch-spark_2.10” % “2.1.1”
//”org.elasticsearch” % “elasticsearch” % “1.7.2”,
//”net.sf.opencsv” % “opencsv” % “2.3”
//”org.apache.hadoop” % “hadoop-streaming” % “1.0.4”
)

Input File:

I took README.md file of Apache Spark 1.6 Download package. I copied the same information again and again to create a big file.

Case 1:

Program using Spark RDD Core APIs:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by Varatharajan Giri Ramanathan on 1/10/2016.
 */
object WordCountDS {
  def main(args:Array[String]) : Unit = {
    System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-common-2.2.0-bin-master\\")
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //With Spark RDD Core-APIs
//    val tf = sc.textFile("D:\\spark\\spark-1.5.0-bin-hadoop2.6\\README_test.md")
//    val splits = tf.flatMap(line => line.split(" ")).map(word =>(word,1)).filter(_ != "")
//    val counts = splits.reduceByKey((x,y)=>x+y).take(10000).foreach(println)
  }
}

Job Output:
.
.
.
16/01/10 21:45:30 INFO DAGScheduler: running: Set()
16/01/10 21:45:30 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/01/10 21:45:30 INFO DAGScheduler: failed: Set()
16/01/10 21:45:30 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[5] at reduceByKey at WordCountDS.scala:16), which has no missing parents
16/01/10 21:45:30 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.6 KB, free 126.6 KB)
16/01/10 21:45:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1605.0 B, free 128.2 KB)
16/01/10 21:45:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:58913 (size: 1605.0 B, free: 2.4 GB)
16/01/10 21:45:30 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/01/10 21:45:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[5] at reduceByKey at WordCountDS.scala:16)
16/01/10 21:45:30 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/01/10 21:45:30 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)
16/01/10 21:45:30 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/01/10 21:45:30 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:58913 in memory (size: 2.3 KB, free: 2.4 GB)
16/01/10 21:45:30 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/01/10 21:45:30 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
16/01/10 21:45:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 8701 bytes result sent to driver
16/01/10 21:45:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 86 ms on localhost (1/1)
16/01/10 21:45:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/01/10 21:45:30 INFO DAGScheduler: ResultStage 1 (take at WordCountDS.scala:16) finished in 0.086 s
16/01/10 21:45:30 INFO DAGScheduler: Job 0 finished: take at WordCountDS.scala:16, took 0.894642 s 
(package,608)
(For,1216)
(Programs,608)
(processing.,608)
(Because,608)
(The,608)
(cluster.,608)
(its,608)
([run,608)
(APIs,608)

Job took 0.90 seconds (~894 ms) approximately to complete displaying 10k records.

Now let's see the execution time using Spark DataSet APIs.


Case 2:
Program using Spark DataSet APIs.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by Varatharajan Giri Ramanathan on 1/10/2016.
 */
object WordCountDS {
  def main(args:Array[String]) : Unit = {
    System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-common-2.2.0-bin-master\\")
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
 
   //With Spark DataSets API
    //Since the Dataset version of word count can take advantage of the built-in aggregate count,
    // this computation can not only be expressed with less code, but it will also execute significantly faster.
//    import sqlContext.implicits._
//    val tf = sqlContext.read.text("D:\\Spark\\spark-1.5.0-bin-hadoop2.6\\README_test.md").as[String]
//    val splits = tf.flatMap(_.split(" ")).filter(_ !=" ")
//    val counts = splits.groupBy(_.toLowerCase).count().take(100).foreach(println)

    //splits.saveAsTextFile("C:\\RnD\\Workspace\\scala\\TestSpark\\testData\\SplitOutput")
    //counts.saveAsTextFile("C:\\RnD\\Workspace\\scala\\TestSpark\\testData\\CountOutput")
  }
}

You have to use SQLContext to read the text file which is making the difference the here.

Job Output:

.
.
.
.
16/01/10 21:48:42 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/01/10 21:48:42 INFO Executor: Finished task 0.0 in stage 2.0 (TID 201). 15032 bytes result sent to driver
16/01/10 21:48:42 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 201) in 141 ms on localhost (1/1)
16/01/10 21:48:42 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
16/01/10 21:48:42 INFO DAGScheduler: ResultStage 2 (take at WordCountDS.scala:24) finished in 0.142 s
16/01/10 21:48:42 INFO DAGScheduler: Job 0 finished: take at WordCountDS.scala:24, took 7.329557 s
16/01/10 21:48:42 INFO GenerateSafeProjection: Code generated in 5.946972 ms
(module,,608)
(<class>,608)
(library,608)
(engine,608)
(graphx,608)
(wiki](https://cwiki.apache.org/confluence/display/spark).,608)
(when,608)
(instance:,608)

Job took only approximately 6 ms. This behavior makes huge difference in Memory Caching as well as for very big datasets Joins and other aggregations.  

 
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s