Just added a Slide Show about Structured Streaming in Apache Spark 2.0.
Below code snippet tells you how to convert NonAscii characters to Regular String and develop a table using Spark Data frame. I have created a small udf and register it in pyspark. Please see the code below and output.
What is WholeStageCodeGen first?
Its basically a hand written code type Code gen designed based on Thomas Neumann’s seminal VLDB 2011 paper. With this, Spark can actually can achieve the performance of hand written code.Hand-written code is written specifically to run that query and nothing else, and as a result it can take advantage of all the information that is known, leading to optimized code that eliminates virtual function dispatches, keeps intermediate data in CPU registers, and can be optimized by the underlying hardware. This is possible only in Spark 2.0.
Example: I am going to perform Sum of around 1billion numbers in Spark 2.0 with WholeStageCodeGen Enabled as true.
This was outputted in my Mac as per below config.
Code is Below:
16/08/19 22:20:17 INFO CodeGenerator: Code generated in 8.807053 ms +------------------+ | sum(id)| +------------------+ |499999999500000000| +------------------+ Total Execution Time 3.313332086 seconds 16/08/19 22:20:17 INFO SparkContext: Invoking stop() from shutdown hook
Below is the Explain Plan: (Where ever you see *, it means that wholestagecodegen has generated hand written code prior to the aggregation.
== Physical Plan == *HashAggregate(keys=, functions=[sum(id#0L)]) +- Exchange SinglePartition +- *HashAggregate(keys=, functions=[partial_sum(id#0L)]) +- *Range (0, 1000000000, splits=1)
Now, let’s experiment the same using wholestagecodegen false.
16/08/19 22:26:43 INFO CodeGenerator: Code generated in 5.217584 ms +------------------+ | sum(id)| +------------------+ |499999999500000000| +------------------+ Total Execution Time 15.299064145 seconds
It took 5 times greater execution time with No WholeStageCodeGen.
Below is the Explain Plan: (You can’t see * symbol here)
== Physical Plan == HashAggregate(keys=, functions=[sum(id#0L)]) +- Exchange SinglePartition +- HashAggregate(keys=, functions=[partial_sum(id#0L)]) +- Range (0, 1000000000, splits=1)
In Spark 2.0, the wholestagecodegen is defaulted option. So, you don’t have to config this specifically.
Please see below on how to create compressed files in Spark 2.0. For Ex. It became lot easier to use the keyword “compression” “gzip” in 2.0.
See the output in the terminal:
Please try this out.
There is a Use case I got it from one of my customer. The use case is to parse and process the below records through csv reader in Spark. Note the records have single and double quotes as present in the records below.
0|"I have one double quote as the first character of the field "1|" I have two double quotes as the first character of the field 2|I have one " that is NOT the first character of the field" 3|I have two " that are NOT the first charcters of the field" 4|I have no double quotes 5|' I have one single quote as the first character of the field 6|'' I have two sincle quotes as the first characters of the field 7|I have one ' that is NOT the first character of the field 8|I have two '' that are NOT the first characters of the field
How to Process this in Scala:
val addlOptions = Map((“header”,”false”),(“delimiter”,”|”),(“mode”,”PERMISSIVE”),(“quote”,null))val df = sqlContext.read.format(“csv”).options(addlOptions)
Please find the Snapshot for the results. I executed this in Databricks Notebook using Spark 1.6.2 Version
How to Process this in Python:
sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’false’, delimiter=’|’, escape=”\\”, quote=”\\”).
Please find the Snapshot for the results. I executed this in Databricks Notebook using Spark 1.6.2 version.
This way you can actually load all malformed records present in a file by loading through spark-csv package without any data loss.
Spark Data Frame : Check for Any Column values with ‘N’ and ‘Y’ and Convert the corresponding Column to Boolean using PySpark
Assume there are many columns in a data frame that are of string type but always have a value of “N” or “Y”. You would like to scan a column to determine if this is true and if it is really just Y or N, then you might want to change the column type to boolean and have false/true as the values of the cells.
What the Below Code does:
1. Collects the Column Names and Column Types in a Python List
2. Iterate over a for loop and collect the distinct value of the columns in a two dimensional array
3. In the Loop, check if the Column type is string and values are either ‘N’ or ‘Y’
4. If Yes ,Convert them to Boolean and Print the value as true/false Else Keep the Same type.
See the Output:
I have attached the snapshot of the results. Please look at the columns flag1,flag3 converted to true or false and rest of the columns are printed with the original value.