How to Parse a CSV File in Spark using DataFrames [or] CSV to Elastic Search through Spark

I downloaded a sample CSV File from this site CSV Downloads. Below is the Spark Program in Scala I have created to parse the CSV File and Load it into the Elastic Search Index.

DOwnloaded File : Real Estate Data CSV

1. Create Schema for the CSV File that you are going to Load
2. Create a Singleton Object to have Spark-csv API from databricks and Org.elasticsearch.spark.sql._ APIs
3. Provide Configuration for Elastic Search
4. Load to ES through ES-Spark APIs

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._

 * Created by Varatharajan Giri Ramanathan on 2/4/2016.
object CsvToEsLoadDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val csvSplit = "street,city,zip,state,beds,baths,sq_ft,type,sale_date,price,latitude,longitude"
    val csvSchema = schemaDef(csvSplit)
    val esConfig = Map(("es.nodes","localhost"),("es.port","9200"),("","true"),("es.http.timeout", "5m"))
    val esdf ="com.databricks.spark.csv")
      .option("inferSchema", "false")
    //val esdf ="com.databricks.spark.csv").option("header","false").option("inferSchema", "false").schema(rfSchema).load("/user/horf/horf_flag_rules.csv")
    esdf.saveToEs("realestatedata/data",cfg = esConfig)
  def schemaDef(baseSchema:String) : StructType = {
    return StructType(baseSchema.split(",").map(f => StructField(f, StringType, true)))

I created a sbt package and used below command to execute the Program.

D:\Spark\spark-1.5.0-bin-hadoop2.6\bin>spark-submit --packages com.databricks:spark-csv_2.10:1.3.0 --jars C:\\Users\\gfp2ram\\.ivy2\\cache\\org.elasticsearch\\elasticsearch-spark_2.10\\jars\\elasticsearch-spark_2.10-2.1.1.jar --class CsvToEsLoadDemo --master local[*] --driver-memory 500m --executor-
memory 2g D:\\RnD\\Workspace\\scala\\SparkLatest\\target\\scala-2.10\\sparklatest_2.10-1.0.jar C:\\Users\\gfp2ram\\Desktop\\Sacramentorealestatetransactions.csv localhost 9200 realestatedata/data

args(0) --> Input CSV File
args(1) --> Es Host name
args(2) --> Es Node Name
args(3) --> IndexName/Type

If you have built build.sbt as below, you can use Run option from your IDE directly.


This Program loaded around 990 records in 10 seconds using local[*] cores. This is great right :)



Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s