How to access a XZ zipped file in Spark?

XZ File compression:

xz is a general-purpose data compression tool with command line syntax similar to gzip(1) and bzip2(1). The native file format is the .xz format. xz compresses or decompresses each file according to the selected operation mode. If no files are given or file is -, xz reads from standard input and
writes the processed data to standard output.
Let’s compress a txt file using xz as below:

C02RG37TG8WM:Downloads vgiridatabricks$ xz /Users/vgiridatabricks/Downloads/ec2blobtotxt/xztestfile.txt 
C02RG37TG8WM:Downloads vgiridatabricks$ ls -ltr /Users/vgiridatabricks/Downloads/ec2blobtotxt
total 16
-rw-r--r-- 1 vgiridatabricks staff 395 Mar 24 22:54 part-00000
-rw-r--r-- 1 vgiridatabricks staff 0 Mar 24 22:54 _SUCCESS
-rw-r--r-- 1 vgiridatabricks staff 256 Mar 25 19:57 xztestfile.txt.xz
C02RG37TG8WM:Downloads vgiridatabricks$

Spark code to read this file

1. Load the XZ File using spark.read.format(“text”) using “io.sensesecure.hadoop.xz.XZCodec” class. Please use “io.sensesecure” % “hadoop-xz” % “1.4” in the build.sbt file [or] Download the jar from here Dependency jar and add to the Spark Class path.
2. Remove the header record (Optional)
3. Create a temp view and parse the file using spark sql built in UDFs.
4. Project it as a table or a show command.


Output:

Make sure XZ read is successful

18/03/25 20:15:47 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/03/25 20:15:47 INFO DAGScheduler: ResultStage 0 (take at XZFileHandling.scala:22) finished in 0.194 s
18/03/25 20:15:47 INFO DAGScheduler: Job 0 finished: take at XZFileHandling.scala:22, took 0.224905 s
instanceId,startTime,deleteTime,hours
i-027fa7ccda210b4f4,2/17/17T20:21,2/17/17T21:11,5
i-07cd7100e3f54bf6a,2/17/17T20:19,2/17/17T21:11,4
i-0a2c4adbf0dc2551c,2/17/17T20:19,2/17/17T21:11,2
i-0b40b16236388973f,2/17/17T20:18,2/17/17T21:11,6
i-0cfd880722e15f19e,2/17/17T20:18,2/17/17T21:11,2
i-0cf0c73efeea14f74,2/17/17T16:21,2/17/17T17:11,1
i-0505e95bfebecd6e6,2/17/17T16:21,2/17/17T17:11,8

Spark SQL to project the records

18/03/25 20:15:49 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
18/03/25 20:15:49 INFO DAGScheduler: ResultStage 2 (show at XZFileHandling.scala:36) finished in 0.016 s
18/03/25 20:15:49 INFO DAGScheduler: Job 2 finished: show at XZFileHandling.scala:36, took 0.018275 s
+-------------------+-------------+-------------+-----+
| instanceid| starttime| deletetime|hours|
+-------------------+-------------+-------------+-----+
|i-027fa7ccda210b4f4|2/17/17T20:21|2/17/17T21:11| 5|
|i-07cd7100e3f54bf6a|2/17/17T20:19|2/17/17T21:11| 4|
|i-0a2c4adbf0dc2551c|2/17/17T20:19|2/17/17T21:11| 2|
|i-0b40b16236388973f|2/17/17T20:18|2/17/17T21:11| 6|
|i-0cfd880722e15f19e|2/17/17T20:18|2/17/17T21:11| 2|
|i-0cf0c73efeea14f74|2/17/17T16:21|2/17/17T17:11| 1|
|i-0505e95bfebecd6e6|2/17/17T16:21|2/17/17T17:11| 8|
+-------------------+-------------+-------------+-----+
Advertisements

How to handle BLOB data present in a XML file using Spark?

The scenario here is: Imagine some cloud instances logs are generated as xml events and every xml event has EC2 instances related information which is encoded in base64 string. You might want to run some analytics after decoding it using spark.

My ec2rpt.xml file looks something like below:

```
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log [<!ENTITY % log SYSTEM "aws_instance">%log;]>
<log systemID="MF2018" timeZone="UTC" timeStamp="Mon Mar 25 16:00:01 2018">
<message source="ec2.log" time="Mon Mar 25 16:00:01 2018" type="sysMSG"><text/>
<detail>
<blob>aW5zdGFuY2VJZCxzdGFydFRpbWUsZGVsZXRlVGltZSxob3Vycw0KaS0wMjdmYTdjY2RhMjEwYjRmNCwyLzE3LzE3VDIwOjIxLDIvMTcvMTdUMjE6MTEsNQ0KaS0wN2NkNzEwMGUzZjU0YmY2YSwyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsNA0KaS0wYTJjNGFkYmYwZGMyNTUxYywyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsMg0KaS0wYjQwYjE2MjM2Mzg4OTczZiwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsNg0KaS0wY2ZkODgwNzIyZTE1ZjE5ZSwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsMg0KaS0wY2YwYzczZWZlZWExNGY3NCwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsMQ0KaS0wNTA1ZTk1YmZlYmVjZDZlNiwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsOA==
</blob>
</detail>
</message>
</log>
```

How to easily solve this using Spark:

1. Load the XML data
2. Use spark_xml library and create a raw dataframe
3. Apply a base64 decoder on the blob column using BASE64Decoder API
4. Save the decoded data in a text file (Optional)
5. Load the text file using Spark dataframe and parse it
6. Create the dataframe (5) as a Spark Sql table

 

Output:

Just try to display the raw XML data:

```
18/03/24 22:54:30 INFO NewHadoopRDD: Input split: file:/Users/vgiridatabricks/Downloads/ec2rpt.xml:0+826
18/03/24 22:54:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1629 bytes result sent to driver
18/03/24 22:54:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 19 ms on localhost (executor driver) (1/1)
18/03/24 22:54:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
18/03/24 22:54:30 INFO DAGScheduler: ResultStage 1 (show at SparkXMLBlob.scala:19) finished in 0.028 s
18/03/24 22:54:30 INFO DAGScheduler: Job 1 finished: show at SparkXMLBlob.scala:19, took 0.030830 s
+-------+--------------------+------+--------------------+
| source| time| type| blob|
+-------+--------------------+------+--------------------+
|ec2.log|Mon Mar 25 16:00:...|sysMSG|aW5zdGFuY2VJZCxzd...|
+-------+--------------------+------+--------------------+

18/03/24 22:54:30 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 217.0 KB, free 2004.1 MB)
18/03/24 22:54:30 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 20.6 KB, free 2004.1 MB)
18/03/24 22:54:30 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.10.10.104:51027 (size: 20.6 KB, free: 2004.6 MB)
```

 

Final Output:

```
18/03/24 22:54:31 INFO DAGScheduler: ResultStage 4 (show at SparkXMLBlob.scala:42) finished in 0.016 s
18/03/24 22:54:31 INFO DAGScheduler: Job 4 finished: show at SparkXMLBlob.scala:42, took 0.019120 s
18/03/24 22:54:31 INFO SparkContext: Invoking stop() from shutdown hook
+-------------------+-------------+-------------+-----+
| instanceId| startTime| deleteTime|hours|
+-------------------+-------------+-------------+-----+
|i-027fa7ccda210b4f4|2/17/17T20:21|2/17/17T21:11| 5|
|i-07cd7100e3f54bf6a|2/17/17T20:19|2/17/17T21:11| 4|
|i-0a2c4adbf0dc2551c|2/17/17T20:19|2/17/17T21:11| 2|
|i-0b40b16236388973f|2/17/17T20:18|2/17/17T21:11| 6|
|i-0cfd880722e15f19e|2/17/17T20:18|2/17/17T21:11| 2|
|i-0cf0c73efeea14f74|2/17/17T16:21|2/17/17T17:11| 1|
|i-0505e95bfebecd6e6|2/17/17T16:21|2/17/17T17:11| 8|
+-------------------+-------------+-------------+-----+

18/03/24 22:54:31 INFO SparkUI: Stopped Spark web UI at http://10.10.10.104:4040
18/03/24 22:54:31 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
```

See you were able to see all Ec2 data after it’s decoded and stored in Spark sql table.

My sbt file looks like this:

name := "Spark2.0-and-greater"
version := "1.0"

//Older Scala Version
scalaVersion := "2.11.8"

val overrideScalaVersion = "2.11.8"
val sparkVersion = "2.3.0"
val sparkXMLVersion = "0.4.1"
val sparkCsvVersion = "1.4.0"
val sparkElasticVersion = "2.2.0"
val sscKafkaVersion = "1.6.2"
val sparkMongoVersion = "1.0.0"
val sparkCassandraVersion = "1.6.0"

//Override Scala Version to the above 2.11.8 version
ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }

resolvers ++= Seq(
  "All Spark Repository -> bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
)

libraryDependencies ++= Seq(
  "org.apache.spark"      %%  "spark-core"      %   sparkVersion  exclude("jline", "2.12"),
  "org.apache.spark"      %% "spark-sql"        % sparkVersion excludeAll(ExclusionRule(organization = "jline"),ExclusionRule("name","2.12")),
  "org.apache.spark"      %% "spark-hive"       % sparkVersion,
  "org.apache.spark"      %% "spark-yarn"       % sparkVersion,
  "com.databricks"        %% "spark-xml"        % sparkXMLVersion,
  "com.databricks"        %% "spark-csv"        % sparkCsvVersion,
  "org.apache.spark"      %% "spark-graphx"     % sparkVersion,
  "org.apache.spark"      %% "spark-catalyst"   % sparkVersion,
  "org.apache.spark"      %% "spark-streaming"  % sparkVersion,
//  "com.101tec"           % "zkclient"         % "0.9",
  "org.elasticsearch"     %% "elasticsearch-spark"        %     sparkElasticVersion,
  "org.apache.spark"      %% "spark-streaming-kafka"     % sscKafkaVersion,
  "org.mongodb.spark"      % "mongo-spark-connector_2.11" %  sparkMongoVersion,
  "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
  "io.sensesecure" % "hadoop-xz" % "1.4"

  // Adding this directly as part of Build.sbt throws Guava Version incompatability issues.
  // Please look my Spark Cassandra Guava Shade Project and use that Jar directly.
  //"com.datastax.spark"     % "spark-cassandra-connector_2.11" % sparkCassandraVersion
)

How to use Threads in Spark Job to achieve parallel Read and Writes

Agenda:

When you have more number of Spark Tables or Dataframes to be written to a persistent storage, you might want to parallelize the operation as much as possible.  Below is the code I used to run for achieving this. This simply uses scala thread and performs the task in parallel in CPU cores.

 

You can see the difference in job performance when you run the similar job without using Threads.

You can verify this from the logs as below. Whereas a regular spark job doesn’t show this log.

17/02/02 05:x57:06 INFO CodeGenerator: Code generated in 165.608313 ms
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO SparkContext: Starting job: save at SparkMultiThreading.scala:41
17/02/02 05:57:07 INFO DAGScheduler: Got job 4 (save at SparkMultiThreading.scala:41) with 1 output partitions
17/02/02 05:57:07 INFO DAGScheduler: Final stage: ResultStage 0 (save at SparkMultiThreading.scala:41)
17/02/02 05:57:07 INFO DAGScheduler: Parents of final stage: List()

Spark Dataframe & Handling \001 delimiters

Agenda:

  • Create a Text formatted Hive table with \001 delimiter and read the underlying warehouse file using spark
  • Create a Text File with \001 delimiter and read it using spark

Create a Dataframe and Register a Temp View:

import org.apache.spark.sql.functions._
val employee = spark.range(0, 100).select($"id".as("employee_id"), (rand() * 3).cast("int").as("dep_id"), (rand() * 40 + 20).cast("int").as("age"))

employee.createOrReplaceTempView("hive001")

//Save Table with \001 delimiter

spark.sql("select concat_ws('\001',employee_id,dep_id,age) as allnew from hive001").repartition(2).write.mode("overwrite").format("text").saveAsTable("hive001_new")

//Save a text file with \001 delimiter also for Another verification

spark.sql("select concat_ws('\001',employee_id,dep_id,age) as allnew from hive001").repartition(2).write.mode("overwrite").format("text").save("/tmp/hive001_new")

//This can further be read using csv using \001 delimiter

sqlContext.read.format("com.databricks.spark.csv").option("delimiter", "\001").load("/user/hive/warehouse/hive001_new").show(3,false)

//Split the underlying files using the \001 delimiter. It works. You can further convert the RDD to Dataframe

spark.sparkContext.textFile("/user/hive/warehouse/hive001_new").map(_.split("\001")).take(3)

res35: Array[Array[String]] = Array(Array(0, 2, 52), Array(2, 2, 30), Array(4, 1, 37))

spark.sparkContext.textFile("/tmp/hive001_new").map(_.split("\001")).take(3)

res36: Array[Array[String]] = Array(Array(0, 2, 52), Array(2, 2, 30), Array(4, 1, 37))

How to Enable WholeStageCodeGen in Spark 2.0

What is WholeStageCodeGen first?

Its basically a hand written code type Code gen designed based on Thomas Neumann’s seminal VLDB 2011 paper.  With this, Spark can actually can achieve the performance of hand written code.Hand-written code is written specifically to run that query and nothing else, and as a result it can take advantage of all the information that is known, leading to optimized code that eliminates virtual function dispatches, keeps intermediate data in CPU registers, and can be optimized by the underlying hardware. This is possible only in Spark 2.0.

Example: I am going to perform Sum of around 1billion numbers in Spark 2.0 with WholeStageCodeGen Enabled as true.

This was outputted in my Mac as per below config.

Screen Shot 2016-08-19 at 10.28.48 PM

Code is Below:

16/08/19 22:20:17 INFO CodeGenerator: Code generated in 8.807053 ms
+------------------+
|           sum(id)|
+------------------+
|499999999500000000|
+------------------+

Total Execution Time 3.313332086 seconds
16/08/19 22:20:17 INFO SparkContext: Invoking stop() from shutdown hook

Below is the Explain Plan: (Where ever you see *, it means that wholestagecodegen has generated hand written code prior to the aggregation.

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(id#0L)])
      +- *Range (0, 1000000000, splits=1)

Now, let’s experiment the same using wholestagecodegen false.

16/08/19 22:26:43 INFO CodeGenerator: Code generated in 5.217584 ms
+------------------+
|           sum(id)|
+------------------+
|499999999500000000|
+------------------+

Total Execution Time 15.299064145 seconds

It took 5 times greater execution time with No WholeStageCodeGen.

Below is the Explain Plan: (You can’t see * symbol here)

== Physical Plan ==
HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition
   +- HashAggregate(keys=[], functions=[partial_sum(id#0L)])
      +- Range (0, 1000000000, splits=1)

In Spark 2.0, the wholestagecodegen is defaulted option. So, you don’t have to config this specifically.