How to Read a HollyWood Movie JSON API (Web Call) through Spark SQL DataFrames and Ingest Into Elastic Search

This is another Interesting Use case I wanted to try today.  I wanted to read some of the Hollywood Movie APIs (Json Format) and Store it into Elastic Search for my analysis. Below are the steps I have performed and achieve the result.

Steps:

1.I got the IronMan JSON File through http://www.omdbapi.com/ (It’s Free)

2.Download the Online Json File to My Local Drive through Java FileUtils.URLtoCopyFile method.

3.If the File already exists, my program deletes and recreates it.

4.Read the Downloaded Json through Spark DataFrame APIs .

5.Load to Elastic Search as a new Index (hollywood/movie) dynamically.

6.Now, read the loaded data in ElasticSearch and display some results through Spark SqlContext.sql method.

Below is the Full Program:

import java.io.File
import java.net.URL

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by gfp2ram on 3/4/2016.
*/
object JsonToEsLoad {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(“JsonToEsLoad”)
.setMaster(“local[*]”)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

    //ElasticSearch Configurations and Input Path to the File.
val hollyWoodFile = new File(“D:\\RnD\\Workspace\\scala\\SparkLatest\\src\\test\\holooywood.json”)
val esConfig = Map((“es.nodes”, “localhost”), (“es.port”, “9200”), (“es.index.auto.create”, “true”), (“es.http.timeout”, “5m”))

    //If File Exists Already, Delete it Recreate it.
if (hollyWoodFile.exists()) {
println(“File Already Exists in So Deleting…” + hollyWoodFile.getAbsolutePath)
hollyWoodFile.delete()
jsontoEsLoad(sqlContext: SQLContext, hollyWoodFile,esConfig)
}

}
  //Recreate the File and Save to Elastic Search
def jsontoEsLoad(sqlContext: SQLContext, hollyWoodFile: File,esConfig:Map[String,String]) : Unit ={

    //URL Download of Json to Local File. Here you can use any file System. Ex. LocalFs, HDFS,S3,etc
FileUtils.copyURLToFile(new URL(“http://www.omdbapi.com/?t=iron+man&y=&plot=full&r=json?accessType=DOWNLOAD”), hollyWoodFile)
val jsonDf = sqlContext.read.json(hollyWoodFile.getAbsolutePath)

    //Storing it in Elastic Search Dynamically
import org.elasticsearch.spark.sql._
jsonDf.saveToEs(“hollywood/movie”, cfg = esConfig)

    //Read the jsonFile as a DataFrame and Select Some values
jsonDf.registerTempTable(“ironman”)
sqlContext.sql(“select Actors,Director,Year,Title from ironman”).take(1).foreach(println)
/*
16/03/04 22:57:34 INFO DAGScheduler: ResultStage 1 (take at JsonToEsLoad.scala:61) finished in 0.347 s
16/03/04 22:57:34 INFO DAGScheduler: Job 1 finished: take at JsonToEsLoad.scala:61, took 0.358971 s
[Robert Downey Jr., Terrence Howard, Jeff Bridges, Gwyneth Paltrow,Jon Favreau,2008,Iron Man]
*/

    //Read the Elastic Search Index and Display Some Records
val readIronManIndex = sqlContext.read.format(“org.elasticsearch.spark.sql”).options(esConfig).load(“hollywood/movie”)
readIronManIndex.registerTempTable(“readIronManIndex”)
sqlContext.sql(“select Actors,Director,Year,Title from readIronManIndex”).take(1).foreach(println)
/*
16/03/04 22:59:52 INFO DAGScheduler: ResultStage 3 (take at JsonToEsLoad.scala:69) finished in 0.059 s
16/03/04 22:59:52 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
16/03/04 22:59:52 INFO DAGScheduler: Job 3 finished: take at JsonToEsLoad.scala:69, took 0.066117 s
[Robert Downey Jr., Terrence Howard, Jeff Bridges, Gwyneth Paltrow,Jon Favreau,2008,Iron Man]
*/
}
}

See the Loaded Data in ElasticSearch: 
Index Mapping Properties: 
root  
|-- Actors: string (nullable = true)  
|-- Awards: string (nullable = true)  
|-- Country: string (nullable = true) 
|-- Director: string (nullable = true)  
|-- Genre: string (nullable = true)  
|-- Language: string (nullable = true) 
|-- Metascore: string (nullable = true)  
|-- Plot: string (nullable = true)  
|-- Poster: string (nullable = true)  
|-- Rated: string (nullable = true)  
|-- Released: string (nullable = true)  
|-- Response: string (nullable = true)  
|-- Runtime: string (nullable = true)  
|-- Title: string (nullable = true)  
|-- Type: string (nullable = true)  
|-- Writer: string (nullable = true)  
|-- Year: string (nullable = true)  
|-- imdbID: string (nullable = true)  
|-- imdbRating: string (nullable = true)  
|-- imdbVotes: string (nullable = true) 
hollywoodEs

JsontoESLoad.scala Output in Elastic Search

build.sbt File:
name := "SparkLatest"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.6.0",
  "org.apache.spark" % "spark-sql_2.10" % "1.6.0",
  //"org.apache.hadoop" % "hadoop-common" % "2.6.0",
  //"org.apache.hadoop" % "hadoop-client" % "2.7.1",
  "org.apache.spark" % "spark-hive_2.10" % "1.6.0",
  "org.apache.spark" % "spark-yarn_2.10" % "1.6.0",
  "com.databricks" % "spark-xml_2.10" % "0.2.0",
  "com.databricks" % "spark-csv_2.10" % "1.3.0",
  //"org.apache.hive" % "hive-exec" % "1.2.1"
  //"org.apache.spark" % "spark-streaming_2.10" % "1.6.0",
  //"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0",
  //"com.fasterxml.jackson.core" % "jackson-core" % "2.6.1",
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.1"
  //"org.elasticsearch" % "elasticsearch" % "1.7.2",
  //"net.sf.opencsv" % "opencsv" % "2.3"
  //"org.apache.hadoop" % "hadoop-streaming" % "1.0.4",
)
You can try this Program for huge Json File as well. I have just shown an example with a Iron Man Movie.
Hope You Enjoyed this Tutorial.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s