How to Read ElasticSearch Index and Load the Data into Hive Table Through Spark SQL DataFrames?

Probably you would have visited my below post on ES-Hive Integration. My earlier Post on Creating a Hive Table by Reading Elastic Search Index thorugh Hive Queries

Let’s see here how to read the Data loaded in a Elastic Search Index through Spark SQL DataFrames and Load the data into a Hive Table. In Production Environments, we can maintain the data syncup betwen Elastic Search and Hive in this way. Please read through this. I have explained this in few steps as below.

After reading this, you will realize how easily we can sync ES and Hive data through Spark Data frame APIs without having much Schema references to load the data.

Step 1: Create a ES Index

Step 2: Add Fields Mapping to the ES Index

Step 3: Load 10 records to the Index

Step 4: Write a Scala Code to perform ES Hive Processing 

Step 5: Display the record in the Hive Table.

Step 1: Create a ES Index. I have created players as a Index name and cricket as a index type

PUT /players
{
 "settings": {
 "number_of_shards": 1,
 "analysis": {
 "filter": {
 "autocomplete_filter": {
 "type": "edge_ngram",
 "min_gram": 1,
 "max_gram": 30
 }
 },
 "analyzer": {
 "autocomplete": {
 "type": "custom",
 "tokenizer": "standard",
 "filter": [
 "lowercase",
 "autocomplete_filter"
 ]
 },
 "string_lowercase": {
 "tokenizer": "keyword",
 "filter": "lowercase"
 }
 }
 }
 }
}

Step 2:Add Fields Mapping to the ES Index

PUT /players/cricket/_mapping
{
 "cricket": {
 "properties": {
 "playerId": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerFirstName": {
 "type": "string"
 },
 "playerLastName": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerAverage": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerCountry": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerType": {
 "type": "string"
 },
 "playerBest": {
 "type": "string"
 }
 }
 }
}

Step 3: Load Around 10 Records as below to the ES Index

PUT /players/cricket/1
{
 "playerId": "1000",
 "playerFirstName": "Sachin",
 "playerLastName": "Tendulkar",
 "playerAverage": "70.4",
 "playerCountry": "India",
 "playerType": "Batsman",
 "playerBest": "205"
}
PUT /players/cricket/2
{
 "playerId": "1001",
 "playerFirstName": "Jonty",
 "playerLastName": "Rhodes",
 "playerAverage": "50",
 "playerCountry": "South Africa",
 "playerType": "Best Filder",
 "playerBest": "121"
}
PUT /players/cricket/3
{
 "playerId": "1002",
 "playerFirstName": "Giri",
 "playerLastName": "Varatharajan",
 "playerAverage": "99",
 "playerCountry": "India",
 "playerType": "BigDataCricketer",
 "playerBest": "400"
}
PUT /players/cricket/4
{
 "playerId": "1003",
 "playerFirstName": "Malaikannan",
 "playerLastName": "Sankara",
 "playerAverage": "110",
 "playerCountry": "India",
 "playerType": "PredictiveModelCricketer",
 "playerBest": "399"
}
PUT /players/cricket/5
{
 "playerId": "1004",
 "playerFirstName": "Mathan",
 "playerLastName": "Pillai",
 "playerAverage": "99.9",
 "playerCountry": "India",
 "playerType": "ProdSupportCricketer",
 "playerBest": "InfiniteValue"
}
PUT /players/cricket/6
{
 "playerId": "1005",
 "playerFirstName": "Singh",
 "playerLastName": "Jaswinder",
 "playerAverage": "99.7",
 "playerCountry": "India",
 "playerType": "SaamaLeadCricketer",
 "playerBest": "199"
}
PUT /players/cricket/7
{
 "playerId": "1006",
 "playerFirstName": "Abhishek",
 "playerLastName": "Peraka",
 "playerAverage": "599",
 "playerCountry": "India",
 "playerType": "BigDataDirectCricketer",
 "playerBest": "299"
}
PUT /players/cricket/8
{
 "playerId": "1007",
 "playerFirstName": "Parthiban",
 "playerLastName": "Part1",
 "playerAverage": "199",
 "playerCountry": "India",
 "playerType": "BigData10yrsCricketer",
 "playerBest": "99.99"
}
PUT /players/cricket/9
{
 "playerId": "1008",
 "playerFirstName": "Rajeev",
 "playerLastName": "Thanga",
 "playerAverage": "189.99",
 "playerCountry": "India",
 "playerType": "BigData1QACricketer",
 "playerBest": "99.100"
}
PUT /players/cricket/10
{
 "playerId": "1009",
 "playerFirstName": "Robin",
 "playerLastName": "Singh",
 "playerAverage": "199.99",
 "playerCountry": "India",
 "playerType": "LeftHand All Rounder",
 "playerBest": "100"
}

After this I verified the data is present in the Index through the Head Plugin

playersESIndexSnapshot

Step 4: Spark SQL DataFrame Code for processing ES to Hive Integration

/*Below APIs are based on Spark 1.4.1 */
scala> val esConfig = Map("pushdown" -> "true", "es.nodes" -> "localhost", "es.port" -> "9200")
scala> val readPlayerESindex = sqlContext.read.format("org.elasticsearch.spark.sql").options(esConfig).load("players/cricket")
scala> readPlayerESindex.printSchema()
/*
root
 |-- playerAverage: string (nullable = true)
 |-- playerBest: string (nullable = true)
 |-- playerCountry: string (nullable = true)
 |-- playerFirstName: string (nullable = true)
 |-- playerId: string (nullable = true)
 |-- playerLastName: string (nullable = true)
 |-- playerType: string (nullable = true) */
scala> readPlayerESindex.take(10).foreach(println)
/*
16/01/30 19:09:08 INFO DAGScheduler: ResultStage 0 (take at <console>:24) finished in 1.210 s
16/01/30 19:09:08 INFO DAGScheduler: Job 0 finished: take at <console>:24, took 1.415645 s
[70.4,205,India,Sachin,1000,Tendulkar,Batsman]
[50,121,South Africa,Jonty,1001,Rhodes,Best Filder]
[99,400,India,Giri,1002,Varatharajan,BigDataCricketer]
[110,399,India,Malaikannan,1003,Sankara,PredictiveModelCricketer]
[99.9,InfiniteValue,India,Mathan,1004,Pillai,ProdSupportCricketer]
[99.7,199,India,Singh,1005,Jaswinder,SaamaLeadCricketer]
[599,299,India,Abhishek,1006,Peraka,BigDataDirectCricketer]
[199,99.99,India,Parthiban,1007,Part1,BigData10yrsCricketer]
[189.99,99.100,India,Rajeev,1008,Thanga,BigData1QACricketer]
[199.99,100,India,Robin,1009,Singh,LeftHand All Rounder] */
scala> readPlayerESindex.registerTempTable("players")
scala> sqlContext.sql("create table imdp.playersESHiveParq (playerAverage string,playerBest string,playerCountry string,playerFirstName string,playerId string,playerLastName string,playerType string) row format delimited fields terminated by ',' stored as parquet")
/*
res3: org.apache.spark.sql.DataFrame = [result: string] */
//I got into Permission Issues. So, I handled in a different way below. But If I would have had proper permission to create a table, I can simply use the below way
//sqlContext.sql("insert overwrite table imdp.playersESHiveParq select playerAverage ,playerBest ,playerCountry ,playerFirstName ,playerId ,playerLastName ,playerType from players")
//Caused by: MetaException(message:java.security.AccessControlException: Permission denied: user=hive, access=WRITE, inode="/apps/hive/warehouse/playerseshive":xgfp2ram:hdfs:drwxr-xr-x
//Another Easiest Way to Store the ES Data In Hive Table. But you should have created the table Before performing this API.
//readPlayerESindex.saveAsTable("imdp.playersESHiveParq")
scala> readPlayerESindex.write.parquet("/user/horf/playerESHiveParq")
scala> sqlContext.sql("load data inpath '/user/horf/playerESHiveParq' into table imdp.playersESHiveParq")

Step 5: View the Final data through the Hive Table

scala> sqlContext.sql(“select * from imdp.playersESHiveParq”).take(10).foreach(println)
/*Output from Hive Table
[70.4,205,India,Sachin,1000,Tendulkar,Batsman]
[50,121,South Africa,Jonty,1001,Rhodes,Best Filder]
[99,400,India,Giri,1002,Varatharajan,BigDataCricketer]
[110,399,India,Malaikannan,1003,Sankara,PredictiveModelCricketer]
[99.9,InfiniteValue,India,Mathan,1004,Pillai,ProdSupportCricketer]
[99.7,199,India,Singh,1005,Jaswinder,SaamaLeadCricketer]
[599,299,India,Abhishek,1006,Peraka,BigDataDirectCricketer]
[199,99.99,India,Parthiban,1007,Part1,BigData10yrsCricketer]
[189.99,99.100,India,Rajeev,1008,Thanga,BigData1QACricketer]
[199.99,100,India,Robin,1009,Singh,LeftHand All Rounder] */

/*Use this type of Spark-shell command

spark-shell –master yarn-client –jars /app/data/workspace/elasticsearch-spark_2.10-2.1.1.jar –executor-cores 1 –executor-memory 15g –driver-memory 20g –queue default –num-executors 30

*/

Download the Jar File through mvnrepository or use the below build.sbt file

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.5.2",
  "org.apache.spark" % "spark-sql_2.10" % "1.5.2",
  "org.apache.hadoop" % "hadoop-common" % "2.6.0",
  "org.apache.spark" % "spark-sql_2.10" % "1.5.2",
  "org.apache.spark" % "spark-hive_2.10" % "1.5.2",
  "org.apache.spark" % "spark-yarn_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
  "com.fasterxml.jackson.core" % "jackson-core" % "2.6.1",
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.1",
  "org.elasticsearch" % "elasticsearch" % "1.7.2",
  "net.sf.opencsv" % "opencsv" % "2.3",
  "org.apache.hadoop" % "hadoop-streaming" % "1.0.4"
)


Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s