How to Ingest Data from Spark to ElasticSearch Index?

I was trying to do a quick POC on data ingestion from Spark to Elastic Search Nodes. Finally i was able to achieve the results. The below program ingested 829k records in < 3 mins using Local Memory Configuration. Please see the program below. This program automatically creates the Index in Elastic Search and does the dynamic mapping by default.

For Elastic Search node configuration, please refer my another blog here.

Sample Data to be Indexed:

Giri Varatharajan 22
Varatharajan Giri 22
Mathan Pillai 22
Keerthi Vaasan 20
Rajeev Thanga 25
Parthi Saama 22
Parthi Cogni 25
Mannar Samy 35
Gaurav Prasad 40
Spark SQL 150
Spark Streaming 150

Scala Code:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

/**
 * Created by Giri R Varatharajan on 9/18/2015.
 */
object ElasticSearch {
  def main(args: Array[String]) {
    System.setProperty(“hadoop.home.dir”, “D:\\hadoop\\hadoop-common-2.2.0-bin-master\\”)
    val sparkconf = new SparkConf().setAppName(“ElasticSearch”).setMaster(“local”).set(“es.index.auto.create”, “true”)
      .set(“es.nodes”, “192.168.56.102”).set(“es.port”,”9201″).set(“es.http.timeout”,”5m”).set(“es.scroll.size”,”50″)
    val sc = new SparkContext(sparkconf)

//Simple Index Creation with Sample data
//    val numbers = Map(“one” -> 1, “two” -> 2, “three” -> 3)
//    val airports = Map(“arrival” -> “Otopeni”, “SFO” -> “San Fran”)
//    sc.makeRDD(Seq(numbers,airports)).saveToEs(“spark_example1/docs”)

//Simple Index Creation with Sample data present in a File
    case class Person(name: String, surname: String, age: Int)
    val people = sc.textFile(“D:\\RnD\\Workspace\\scala\\TestSpark\\testData\\peoplefile”).map(_.split(” “)).map(p => Person(p(0), p(1), p(2).trim.toInt))
    people.saveToEs(“spark_example1/people”)

//Connect to ES through Connection Parameters Provided Separately in a Configuration value.

//Spark 1.3 Style
//    val ESConfigOptions = Map(“pushdown” -> “true”, “es.nodes” -> “192.168.56.102”, “es.port” -> “9201”)
//    val sqlc = new SQLContext(sc)
//    val dataFrameLayout = sqlc.read.format(“org.elasticsearch.spark.sql”).options(ESConfigOptions).load(“spark_example1/people”)
//    dataFrameLayout.registerTempTable(“people”)
//    sqlc.sql(“select * from people”).collect().foreach(println)

//Primary Option to Read and Display ES Index and Data Present into It

//Spark 1.3 Style
//      val sqlc = new SQLContext(sc)
//      val sarDF = sqlc.esDF(“spark_example3/people”)
//      sarDF.registerTempTable(“people”)
//      sqlc.sql(“select * from people”).collect().foreach(println)
}
}

Highlighted ones Actually does the Ingestion. Remaining commented code I have given is for your reference.

image003

Please refer the spark_example1/people Index in the image.

Advertisements

One thought on “How to Ingest Data from Spark to ElasticSearch Index?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s