Parsing a Basic XML using Hadoop and Spark Core APIs

In the code snippet below we can see how the stream reader is configured. In order to read XML data it has to be a StreamXmlRecordReader. The stream reader has to also know between which XML tags it can read in parallel chunks of data. In our case is <trkseg>. UsingFileInputFormat.addInputPaths() we set the place where the data is read. It can be HDFS, S3 or even the local file system, as it is in this case. The stream reads everything that has a GPXextension from the given directory.

Input XML File:

<trk>
    <name>GPX file example</name>
    <trkseg>
        <trkpt lat="75.645722" lon="15.590474">
            <ele>597.0</ele>
            <time>2014-11-23T19:23:57Z</time>
        </trkpt>
        <trkpt lat="75.645579" lon="15.591047">
            <ele>597.0</ele>
            <time>2014-11-23T19:24:00Z</time>
        </trkpt>
        <trkpt lat="75.645598" lon="15.591242">
            <ele>598.7</ele>
            <time>2014-11-23T19:24:02Z</time>
        </trkpt>
    </trkseg>
</trk>

Spark Code Samples:

import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}

//References
//http://alvinalexander.com/scala/xml-parsing-xpath-extract-xml-tag-attributes
//http://programmingstories.blogspot.com/2014/12/read-large-gpx-or-xml-files-using.html
//https://github.com/HyukjinKwon/spark-xml/raw/master/src/test/resources/books.xml

/**
 * Created by gfp2ram on 11/4/2015.
 */
object XMLParser {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-common-2.2.0-bin-master\\")
    val conf = new SparkConf().setAppName("XMLParsing").setMaster("local")
    val sc = new SparkContext(conf)

    //OPTION 1
//    val weather =
//      <rss>
//        <channel>
//          <title>Yahoo! Weather - Boulder, CO</title>
//          <item>
//            <title>Conditions for Boulder, CO at 2:54 pm MST</title>
//            <forecast day="Thu" date="10 Nov 2011" low="37" high="58" text="Partly Cloudy"
//                      code="29" />
//          </item>
//        </channel>
//      </rss>
//    val forecast = weather \ "channel" \ "item" \ "forecast"
//    val day = forecast \ "@day"     // Thu
//    val date = forecast \ "@date"   // 10 Nov 2011
//    val low = forecast \ "@low"     // 37
//    val high = forecast \ "@high"   // 58
//    val text = forecast \ "@text"   // Partly Cloudy
//    println(" "+day+" "+date+" "+low+" "+high+" "+text)


    //OPTION 2
    val jobConf = new JobConf()
    jobConf.set("stream.recordreader.class",
      "org.apache.hadoop.streaming.StreamXmlRecordReader")
    jobConf.set("stream.recordreader.begin", "<trkseg")
    jobConf.set("stream.recordreader.end", "</trkseg>")
    org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf, "D:\\RnD\\Workspace\\scala\\TestSpark\\testData\\xmlinput.xml")

    // Load documents (one per line).
    val documents = sc.hadoopRDD(jobConf,  classOf[org.apache.hadoop.streaming.StreamInputFormat],
      classOf[org.apache.hadoop.io.Text],
      classOf[org.apache.hadoop.io.Text])

    import scala.xml.XML
    val texts = documents.map(_._1.toString)
      .map{ s =>
        val xml = XML.loadString(s)
        val trackpts = xml \ "trkpt"
        val gpsData = trackpts.map(
          xmlNode =>(
            (xmlNode \ "@lat").text.toDouble,
            (xmlNode \ "@lon").text.toDouble
          ))
        gpsData.toList
      }
    println(texts.first)

    //OPTION 3 -- > Under Construction in GitHub
//    import org.apache.spark.sql.SQLContext
//
//    val sqlContext = new SQLContext(sc)
//    val df = sqlContext.read
//    .format("org.apache.spark.sql.xml")
//    .option("rootTag", "trkseg") // This should be always given.
//    .load("D:\\RnD\\Workspace\\scala\\TestSpark\\testData\\books.xml")
//    df.collect().foreach(println)


    //OPTION 4 --> Under Construction in GitHub
//    import org.apache.spark.sql.SQLContext
//    import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType};
//
//    val sqlContext = new SQLContext(sc)
//    val customSchema = StructType(
//    StructField("author", StringType, nullable = true),
//    StructField("description", StringType, nullable = true),
//    StructField("genre", StringType ,nullable = true),
//    StructField("id", StringType, nullable = true),
//    StructField("price", DoubleType, nullable = true),
//    StructField("publish_date", StringType, nullable = true),
//    StructField("title", StringType, nullable = true))
//
//    val df = sqlContext.read
//    .format("org.apache.spark.sql.xml")
//    .option("rootTag", "book") // This should be always given.
//    .schema(customSchema)
//    .load("books.xml")
//
//    df.select("author", "id").collect().foreach(println)
  }
}

Job Output:

16/01/07 18:49:53 INFO streaming.StreamBaseRecordReader: StreamBaseRecordReader.init: start_=0 end_=509 length_=509 start_ > in_.getPos() =false 0 > 0
16/01/07 18:49:53 INFO streaming.StreamBaseRecordReader: HSTR L7GCC5CG5335P92 1. pos=509 xmlinput.xml:0+509 Processing record=<trkseg>
 <trkpt lat="75.645722" lon="15.590474">
 <ele>597.0</ele>
 <time>2014-11-23T19:23:57Z</time>
 </trkpt>
 <trkpt lat="75.645579" lon="15.591047" xmlinput.xml
16/01/07 18:49:53 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2462 bytes result sent to driver
16/01/07 18:49:53 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 271 ms on localhost (1/1)
16/01/07 18:49:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/01/07 18:49:53 INFO scheduler.DAGScheduler: ResultStage 0 (first at XMLParser.scala:64) finished in 0.293 s
16/01/07 18:49:53 INFO scheduler.DAGScheduler: Job 0 finished: first at XMLParser.scala:64, took 0.385569 s
List((75.645722,15.590474), (75.645579,15.591047), (75.645598,15.591242))
16/01/07 18:49:53 INFO spark.SparkContext: Invoking stop() from shutdown hook
Advertisements

2 thoughts on “Parsing a Basic XML using Hadoop and Spark Core APIs

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s