How to Process a Basic JSON type data in Spark

I have given a sample JSON data in a custom collections as below. I have used Hive Context to read the JSON and used Spark SQL to load into a temporary table. Please see the code below.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by Varatharajan Giri Ramanathan on 9/6/2015.
 */

object JSONLoad {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-common-2.2.0-bin-master\\")
    val conf = new SparkConf().setAppName("JSONSpark").setMaster("local")
    val sc = new SparkContext(conf)
    val sCtx = new SQLContext(sc)
val inputData = sc.parallelize(
      """{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"address":
{
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021"
},
"phoneNumber":
[
{
"type": "home",
"number": "212 555-1234"
},
{
"type": "fax",
"number": "646 555-4567"
}
]
}""" :: Nil)
    val JSONData = sCtx.read.json(inputData)
    JSONData.registerTempTable("customerform")
    val queryOutput = sCtx.sql("select firstName,lastName,age,address.streetAddress,address.city,address.state,address.postalCode,phoneNumber.number[0] as homePhone from customerform").take(1).foreach(println)

Job Output:
16/01/07 18:03:03 INFO spark.SparkContext: Running Spark version 1.5.2
16/01/07 18:03:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/07 18:03:05 INFO spark.SecurityManager: Changing view acls to: gfp2ram
16/01/07 18:03:05 INFO spark.SecurityManager: Changing modify acls to: gfp2ram
16/01/07 18:03:05 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(gfp2ram); users with modify permissions: Set(gfp2ram)
16/01/07 18:03:06 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/01/07 18:03:06 INFO Remoting: Starting remoting
16/01/07 18:03:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.228.192.121:55951]
16/01/07 18:03:06 INFO util.Utils: Successfully started service 'sparkDriver' on port 55951.
16/01/07 18:03:06 INFO spark.SparkEnv: Registering MapOutputTracker
16/01/07 18:03:06 INFO spark.SparkEnv: Registering BlockManagerMaster
16/01/07 18:03:06 INFO storage.DiskBlockManager: Created local directory at C:\Users\gfp2ram\AppData\Local\Temp\blockmgr-244a3ea8-de11-496f-900b-86e48ecc0542
16/01/07 18:03:06 INFO storage.MemoryStore: MemoryStore started with capacity 1951.8 MB
16/01/07 18:03:06 INFO spark.HttpFileServer: HTTP File server directory is C:\Users\gfp2ram\AppData\Local\Temp\spark-daa11ce9-8443-48bc-a4ce-0ad28f470787\httpd-2ea9995a-36a9-461d-926f-8da8fb484cd9
16/01/07 18:03:06 INFO spark.HttpServer: Starting HTTP Server
16/01/07 18:03:06 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/01/07 18:03:06 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55953
16/01/07 18:03:06 INFO util.Utils: Successfully started service 'HTTP file server' on port 55953.
16/01/07 18:03:06 INFO spark.SparkEnv: Registering OutputCommitCoordinator
16/01/07 18:03:07 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/01/07 18:03:07 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/01/07 18:03:07 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
16/01/07 18:03:07 INFO ui.SparkUI: Started SparkUI at http://10.228.192.121:4040
16/01/07 18:03:07 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/01/07 18:03:07 INFO executor.Executor: Starting executor ID driver on host localhost
16/01/07 18:03:08 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55961.
16/01/07 18:03:08 INFO netty.NettyBlockTransferService: Server created on 55961
16/01/07 18:03:08 INFO storage.BlockManagerMaster: Trying to register BlockManager
16/01/07 18:03:08 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:55961 with 1951.8 MB RAM, BlockManagerId(driver, localhost, 55961)
16/01/07 18:03:08 INFO storage.BlockManagerMaster: Registered BlockManager
16/01/07 18:03:09 INFO spark.SparkContext: Starting job: json at JSONLoad.scala:70
16/01/07 18:03:09 INFO scheduler.DAGScheduler: Got job 0 (json at JSONLoad.scala:70) with 1 output partitions
16/01/07 18:03:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(json at JSONLoad.scala:70)
16/01/07 18:03:09 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/01/07 18:03:09 INFO scheduler.DAGScheduler: Missing parents: List()
16/01/07 18:03:09 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at json at JSONLoad.scala:70), which has no missing parents
16/01/07 18:03:09 INFO storage.MemoryStore: ensureFreeSpace(2976) called with curMem=0, maxMem=2046642094
16/01/07 18:03:09 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.9 KB, free 1951.8 MB)
16/01/07 18:03:09 INFO storage.MemoryStore: ensureFreeSpace(1730) called with curMem=2976, maxMem=2046642094
16/01/07 18:03:09 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1730.0 B, free 1951.8 MB)
16/01/07 18:03:09 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:55961 (size: 1730.0 B, free: 1951.8 MB)
16/01/07 18:03:09 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
16/01/07 18:03:09 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at json at JSONLoad.scala:70)
16/01/07 18:03:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/01/07 18:03:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2384 bytes)
16/01/07 18:03:09 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
16/01/07 18:03:10 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2146 bytes result sent to driver
16/01/07 18:03:10 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 728 ms on localhost (1/1)
16/01/07 18:03:10 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/01/07 18:03:10 INFO scheduler.DAGScheduler: ResultStage 0 (json at JSONLoad.scala:70) finished in 0.751 s
16/01/07 18:03:10 INFO scheduler.DAGScheduler: Job 0 finished: json at JSONLoad.scala:70, took 1.203895 s
16/01/07 18:03:11 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on localhost:55961 in memory (size: 1730.0 B, free: 1951.8 MB)
16/01/07 18:03:11 INFO spark.ContextCleaner: Cleaned accumulator 1
16/01/07 18:03:11 INFO storage.MemoryStore: ensureFreeSpace(104792) called with curMem=0, maxMem=2046642094
16/01/07 18:03:11 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 102.3 KB, free 1951.7 MB)
16/01/07 18:03:11 INFO storage.MemoryStore: ensureFreeSpace(11442) called with curMem=104792, maxMem=2046642094
16/01/07 18:03:11 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 11.2 KB, free 1951.7 MB)
16/01/07 18:03:11 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:55961 (size: 11.2 KB, free: 1951.8 MB)
16/01/07 18:03:11 INFO spark.SparkContext: Created broadcast 1 from take at JSONLoad.scala:72
16/01/07 18:03:11 INFO spark.SparkContext: Starting job: take at JSONLoad.scala:72
16/01/07 18:03:11 INFO scheduler.DAGScheduler: Got job 1 (take at JSONLoad.scala:72) with 1 output partitions
16/01/07 18:03:11 INFO scheduler.DAGScheduler: Final stage: ResultStage 1(take at JSONLoad.scala:72)
16/01/07 18:03:11 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/01/07 18:03:11 INFO scheduler.DAGScheduler: Missing parents: List()
16/01/07 18:03:11 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at take at JSONLoad.scala:72), which has no missing parents
16/01/07 18:03:11 INFO storage.MemoryStore: ensureFreeSpace(6896) called with curMem=116234, maxMem=2046642094
16/01/07 18:03:11 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.7 KB, free 1951.7 MB)
16/01/07 18:03:11 INFO storage.MemoryStore: ensureFreeSpace(3521) called with curMem=123130, maxMem=2046642094
16/01/07 18:03:11 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1951.7 MB)
16/01/07 18:03:11 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:55961 (size: 3.4 KB, free: 1951.8 MB)
16/01/07 18:03:11 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
16/01/07 18:03:11 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at take at JSONLoad.scala:72)
16/01/07 18:03:11 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/01/07 18:03:11 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 2384 bytes)
16/01/07 18:03:11 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1)
16/01/07 18:03:11 INFO codegen.GenerateMutableProjection: Code generated in 178.895888 ms
16/01/07 18:03:11 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1561 bytes result sent to driver
16/01/07 18:03:11 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 255 ms on localhost (1/1)
16/01/07 18:03:11 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/01/07 18:03:11 INFO scheduler.DAGScheduler: ResultStage 1 (take at JSONLoad.scala:72) finished in 0.257 s
16/01/07 18:03:11 INFO scheduler.DAGScheduler: Job 1 finished: take at JSONLoad.scala:72, took 0.269651 s
[John,Smith,25,21 2nd Street,New York,NY,10021,212 555-1234]

Retrieve the Schema of the DataFrame:

JSONData.printSchema()

root
 |-- address: struct (nullable = true)
 | |-- city: string (nullable = true)
 | |-- postalCode: string (nullable = true)
 | |-- state: string (nullable = true)
 | |-- streetAddress: string (nullable = true)
 |-- age: long (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- phoneNumber: array (nullable = true)
 | |-- element: struct (containsNull = true)
 | | |-- number: string (nullable = true)
 | | |-- type: string (nullable = true)

Getting the Explain Plan of the Query:

queryOutput.explain()
JSONData.explain()

Project [firstName#2,lastName#3,age#1L,address#0.streetAddress AS streetAddress#12,address#0.city AS city#13,address#0.state AS state#14,address#0.postalCode AS postalCode#15,phoneNumber#4.number[0] AS homePhone#5]
 Scan JSONRelation[][lastName#3,firstName#2,address#0,age#1L,phoneNumber#4]

If your JSON is a *.json file type, you can read the JSON files as below.
sqlContext.read.json("/input/path/of/JSON/directory")
build.sbt File I used in IntelliJ:

name := "TestSpark"

version := "1.0"

scalaVersion := "2.10.4"

/*libraryDependencies ++= Seq(
  groupID % artifactID % revision,
  groupID % otherID % otherRevision
)*/

// Skip tests during assembly
test in assembly := {}

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.5.2",
  "org.apache.spark" % "spark-sql_2.10" % "1.5.2",
  "org.apache.hadoop" % "hadoop-common" % "2.6.0",
  "org.apache.spark" % "spark-sql_2.10" % "1.5.2",
  "org.apache.spark" % "spark-hive_2.10" % "1.5.2",
  "org.apache.spark" % "spark-yarn_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
  "com.fasterxml.jackson.core" % "jackson-core" % "2.6.1",
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.1",
  "org.elasticsearch" % "elasticsearch" % "1.7.2",
  "net.sf.opencsv" % "opencsv" % "2.3",
  "org.apache.hadoop" % "hadoop-streaming" % "1.0.4"
)

Please look the below blog on how to handle some Complex JSON type in Spark.

Handling Complex JSON Data in Spark

		
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s