How to Parse XML data using Spark XML APIs

Analysis of a XML data in Hadoop is little complex process. But doing it in Spark is bit  easier than Hadoop. I have used spark-xml APIs from Databricks. I have taken the below  pom.xml file that is frequenty used by Java developers.

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.springframework.boot</groupId>
  <artifactId>HOSIUPortalDev</artifactId>
  <packaging>war</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>HOSIUPortalDev Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.2.7.RELEASE</version>
  </parent>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
      <version>1.2.5.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>1.2.5.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>1.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.webjars</groupId>
      <artifactId>bootstrap</artifactId>
      <version>3.3.5</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.tomcat.embed</groupId>
      <artifactId>tomcat-embed-jasper</artifactId>
      <scope>provided</scope>
    </dependency>
  </dependencies>
  <build>
    <finalName>HOSIUPortalDev</finalName>
  </build>
</project>

Spark-XML references
path --> Location of files. Similar to Spark can accept standard Hadoop globbing expressions.
rowTag -- > The row tag of your xml files to treat as a row

Scala Code:

scala>val sqlContext = new HiveContext(sc)
scala>val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "project").load("/user/horf/pom.xml")
scala>df.registerTempTable("xmldf")
scala>val xmldfhive = sqlContext.sql("select artifactId,build,dependencies,modelversion,name,packaging,url,version,groupid,parent from xmldf")
scala>xmldfhive
/*
xmldfhive: org.apache.spark.sql.DataFrame = 
[artifactId: string, 
build: struct<finalName:string>, 
dependencies: struct<dependency:array<struct<artifactId:string,groupId:string,scope:string,version:string>>>, 
groupId: string, 
modelVersion: string, 
name: string, 
packaging: string, 
parent: struct<artifactId:string,groupId:string,version:string>, 
url: string, 
version: string, 
xmlns: string, 
xmlns:xsi: string, 
xsi:schemaLocation: string]
*/

scala>sqlContext.sql("drop table if exists IMDP.POMXML")
scala>sqlContext.sql("CREATE TABLE IMDP.POMXML(artifactid string,build struct<finalname:string>,dependencies struct<dependency:array<struct<artifactId:string,groupId:string,scope:string,version:string>>>,modelVersion string,name string,packaging string,url string,version string,groupId string, parent struct<artifactId:string,groupId:string,version:string>) STORED AS PARQUET")
scala>sqlContext.sql("dfs -rmr /user/horf/pomxmldata")
scala>xmldfhive.write.parquet("/user/horf/pomxmldata")
scala>sqlContext.sql("load data inpath '/user/horf/pomxmldata' into table IMDP.POMXML")
scala>val pomxmlhive = sqlContext.sql("select artifactid,build.finalname as finalname,dependencies.dependency.artifactid as departifactid,dependencies.dependency.groupid as depgroupid,dependencies.dependency.scope as depscope,dependencies.dependency.version as depversion,modelversion,name as projectname,packaging,url,version,groupid as groupid,parent.artifactId as parentartifactid,parent.groupId as parentgroupid,parent.version as parentversion from imdp.pomxml")
scala>pomxmlhive.printSchema()

/*
root
 |-- artifactId: string (nullable = true)
 |-- finalname: string (nullable = true)
 |-- departifactid: array (nullable = true)
 | |-- element: string (containsNull = true)
 |-- depgroupid: array (nullable = true)
 | |-- element: string (containsNull = true)
 |-- depscope: array (nullable = true)
 | |-- element: string (containsNull = true)
 |-- depversion: array (nullable = true)
 | |-- element: string (containsNull = true)
 |-- modelversion: string (nullable = true)
 |-- projectname: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- url: string (nullable = true)
 |-- version: string (nullable = true)
 |-- groupid: string (nullable = true)
 |-- parentartifactid: string (nullable = true)
 |-- parentgroupid: string (nullable = true)
 |-- parentversion: string (nullable = true)
*/
scala>pomxmlhive.take(1).foreach(println)
//Final Output
//[HOSIUPortalDev,null,ArrayBuffer(junit, javax.servlet-api, spring-boot-starter-data-elasticsearch, spring-boot-starter, elasticsearch, spring-boot-starter-web, bootstrap, gson, tomcat-embed-jasper),ArrayBuffer(junit, javax.servlet, org.springframework.boot, org.springframework.boot, org.elasticsearch, org.springframework.boot, org.webjars, com.google.code.gson, org.apache.tomcat.embed),ArrayBuffer(test, null, null, null, null, null, null, null, provided),ArrayBuffer(3.8.1, 3.1.0, 1.2.5.RELEASE, 1.2.5.RELEASE, 1.4.1, null, 3.3.5, 2.4, null),4.0.0,HOSIUPortalDev Maven Webapp,war,http://maven.apache.org,0.0.1-SNAPSHOT,org.springframework.boot,spring-boot-starter-parent,org.springframework.boot,1.2.7.RELEASE]

You can further Use Hive Explode by having Lateral view of array<String> columns. I haven't explained here.

build.sbt

name := "SparkXMLSample"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq (
 "org.apache.spark" % "spark-core_2.10" % "1.6.0",
 "org.apache.spark" % "spark-sql_2.10" % "1.6.0",
 "org.apache.spark" % "spark-sql_2.10" % "1.6.0",
 "org.apache.spark" % "spark-hive_2.10" % "1.6.0",
 "org.apache.spark" % "spark-yarn_2.10" % "1.6.0",
 "com.databricks" % "spark-xml_2.10" % "0.2.0")

Spark-Shell commands to be used:

spark-shell --master yarn-client --packages com.databricks:spark-xml_2.11:0.3.1 --executor-cores 1 --executor-memory 15g --driver-memory 20g --queue default--num-executors 30
[or]
--You can download the jar from mvnrepository and use as below.
spark-shell --master yarn-client --jars /app/data/workspace/spark-xml_2.10-0.2.0.jar --executor-cores 1 --executor-memory 15g --driver-memory 20g --queue default --num-executors 30

 

3 thoughts on “How to Parse XML data using Spark XML APIs

Leave a comment