How to Enable WholeStageCodeGen in Spark 2.0

What is WholeStageCodeGen first?

Its basically a hand written code type Code gen designed based on Thomas Neumann’s seminal VLDB 2011 paper.  With this, Spark can actually can achieve the performance of hand written code.Hand-written code is written specifically to run that query and nothing else, and as a result it can take advantage of all the information that is known, leading to optimized code that eliminates virtual function dispatches, keeps intermediate data in CPU registers, and can be optimized by the underlying hardware. This is possible only in Spark 2.0.

Example: I am going to perform Sum of around 1billion numbers in Spark 2.0 with WholeStageCodeGen Enabled as true.

This was outputted in my Mac as per below config.

Screen Shot 2016-08-19 at 10.28.48 PM

Code is Below:

16/08/19 22:20:17 INFO CodeGenerator: Code generated in 8.807053 ms
+------------------+
|           sum(id)|
+------------------+
|499999999500000000|
+------------------+

Total Execution Time 3.313332086 seconds
16/08/19 22:20:17 INFO SparkContext: Invoking stop() from shutdown hook

Below is the Explain Plan: (Where ever you see *, it means that wholestagecodegen has generated hand written code prior to the aggregation.

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(id#0L)])
      +- *Range (0, 1000000000, splits=1)

Now, let’s experiment the same using wholestagecodegen false.

16/08/19 22:26:43 INFO CodeGenerator: Code generated in 5.217584 ms
+------------------+
|           sum(id)|
+------------------+
|499999999500000000|
+------------------+

Total Execution Time 15.299064145 seconds

It took 5 times greater execution time with No WholeStageCodeGen.

Below is the Explain Plan: (You can’t see * symbol here)

== Physical Plan ==
HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition
   +- HashAggregate(keys=[], functions=[partial_sum(id#0L)])
      +- Range (0, 1000000000, splits=1)

In Spark 2.0, the wholestagecodegen is defaulted option. So, you don’t have to config this specifically.

Loading Malformed Records in Spark through CSV Reader

There is a Use case I got it from one of my customer. The use case is to parse and process the below records  through csv reader in Spark. Note the records have single and double quotes as present in the records below.

Input.txt:

0|"I have one double quote as the first character of the field
"1|" I have two double quotes as the first character of the field
2|I have one " that is NOT the first character of the field"
3|I have two " that are NOT the first charcters of the field"
4|I have no double quotes
5|' I have one single quote as the first character of the field
6|'' I have two sincle quotes as the first characters of the field
7|I have one ' that is NOT the first character of the field
8|I have two '' that are NOT the first characters of the field

How to Process this in Scala:

val addlOptions = Map((“header”,”false”),(“delimiter”,”|”),(“mode”,”PERMISSIVE”),(“quote”,null))val df = sqlContext.read.format(“csv”).options(addlOptions)
.load(“/FileStore/tables/d5hw1vsq1464390309397/doublequotes.txt”).show()

Please find the Snapshot for the results. I executed this in Databricks Notebook using Spark 1.6.2 Version

csv_double_quotes_issue_giri

How to Process this in Python:

sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’false’, delimiter=’|’, escape=”\\”, quote=”\\”).
load(‘/FileStore/tables/d5hw1vsq1464390309397/doublequotes.txt’ ).show()

Please find the Snapshot for the results. I executed this in Databricks Notebook using Spark 1.6.2 version.

Screen Shot 2016-05-27 at 4.53.46 PM

This way you can actually load all malformed records present in a file by loading through spark-csv package without any data loss.

 

Spark Data Frame : Check for Any Column values with ‘N’ and ‘Y’ and Convert the corresponding Column to Boolean using PySpark

Assume there are many columns in a data frame that are of string type but always have a value of “N” or “Y”.  You would like to scan a column to determine if this is true  and  if it is really just Y or N, then you might want to change the column type to boolean and have false/true as the values of the cells.

What the Below Code does:
1. Collects the Column Names and Column Types in a Python List
2. Iterate over a for loop and collect the distinct value of the columns in a two dimensional array
3. In the Loop, check if the Column type is string and values are either ‘N’ or ‘Y’
4. If Yes ,Convert them to Boolean and Print the value as true/false Else Keep the Same type.

PySpark Code:

See the Output:

I have attached the snapshot of the results. Please look at the columns flag1,flag3 converted to true or false and rest of the columns are printed with the original value.

Screen Shot 2016-08-05 at 3.28.57 PM