How to use Threads in Spark Job to achieve parallel Read and Writes

Agenda:

When you have more number of Spark Tables or Dataframes to be written to a persistent storage, you might want to parallelize the operation as long as possible.  Below is the code I used to run for achieving this. This simply uses scala thread and performs the task in parallel in CPU cores.

 

You can see the difference in job performance when you run the similar job without using Threads.

You can verify this from the logs as below. Whereas a regular spark job doesn’t show this log.

17/02/02 05:x57:06 INFO CodeGenerator: Code generated in 165.608313 ms
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO DAGScheduler: Got job 4 (save at SparkMultiThreading.scala:41) with 1 output partitions
17/02/02 05:57:07 INFO DAGScheduler: Final stage: ResultStage 0 (save at SparkMultiThreading.scala:41)
17/02/02 05:57:07 INFO DAGScheduler: Parents of final stage: List()

Spark Dataframe & Handling \001 delimiters

Agenda:

  • Create a Text formatted Hive table with \001 delimiter and read the underlying warehouse file using spark
  • Create a Text File with \001 delimiter and read it using spark

Create a Dataframe and Register a Temp View:

import org.apache.spark.sql.functions._
val employee = spark.range(0, 100).select($"id".as("employee_id"), (rand() * 3).cast("int").as("dep_id"), (rand() * 40 + 20).cast("int").as("age"))

employee.createOrReplaceTempView("hive001")

//Save Table with \001 delimiter

spark.sql("select concat_ws('\001',employee_id,dep_id,age) as allnew from hive001").repartition(2).write.mode("overwrite").format("text").saveAsTable("hive001_new")

//Save a text file with \001 delimiter also for Another verification

spark.sql("select concat_ws('\001',employee_id,dep_id,age) as allnew from hive001").repartition(2).write.mode("overwrite").format("text").save("/tmp/hive001_new")

//This can further be read using csv using \001 delimiter

sqlContext.read.format("com.databricks.spark.csv").option("delimiter", "\001").load("/user/hive/warehouse/hive001_new").show(3,false)

//Split the underlying files using the \001 delimiter. It works. You can further convert the RDD to Dataframe

spark.sparkContext.textFile("/user/hive/warehouse/hive001_new").map(_.split("\001")).take(3)

res35: Array[Array[String]] = Array(Array(0, 2, 52), Array(2, 2, 30), Array(4, 1, 37))

spark.sparkContext.textFile("/tmp/hive001_new").map(_.split("\001")).take(3)

res36: Array[Array[String]] = Array(Array(0, 2, 52), Array(2, 2, 30), Array(4, 1, 37))

How to Enable WholeStageCodeGen in Spark 2.0

What is WholeStageCodeGen first?

Its basically a hand written code type Code gen designed based on Thomas Neumann’s seminal VLDB 2011 paper.  With this, Spark can actually can achieve the performance of hand written code.Hand-written code is written specifically to run that query and nothing else, and as a result it can take advantage of all the information that is known, leading to optimized code that eliminates virtual function dispatches, keeps intermediate data in CPU registers, and can be optimized by the underlying hardware. This is possible only in Spark 2.0.

Example: I am going to perform Sum of around 1billion numbers in Spark 2.0 with WholeStageCodeGen Enabled as true.

This was outputted in my Mac as per below config.

Screen Shot 2016-08-19 at 10.28.48 PM

Code is Below:

16/08/19 22:20:17 INFO CodeGenerator: Code generated in 8.807053 ms
+------------------+
|           sum(id)|
+------------------+
|499999999500000000|
+------------------+

Total Execution Time 3.313332086 seconds
16/08/19 22:20:17 INFO SparkContext: Invoking stop() from shutdown hook

Below is the Explain Plan: (Where ever you see *, it means that wholestagecodegen has generated hand written code prior to the aggregation.

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(id#0L)])
      +- *Range (0, 1000000000, splits=1)

Now, let’s experiment the same using wholestagecodegen false.

16/08/19 22:26:43 INFO CodeGenerator: Code generated in 5.217584 ms
+------------------+
|           sum(id)|
+------------------+
|499999999500000000|
+------------------+

Total Execution Time 15.299064145 seconds

It took 5 times greater execution time with No WholeStageCodeGen.

Below is the Explain Plan: (You can’t see * symbol here)

== Physical Plan ==
HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition
   +- HashAggregate(keys=[], functions=[partial_sum(id#0L)])
      +- Range (0, 1000000000, splits=1)

In Spark 2.0, the wholestagecodegen is defaulted option. So, you don’t have to config this specifically.