How to Remove a Header Record and Store Rest into a Spark SQL table

Most of the CSV,XLS files are getting created with Header Data. I have explained here how to remove the first record and store rest of the records into  a Spark SQL table.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by gfp2ram on 10/30/2015.
 */
object RemoveHeader {
  case class csvData(sno:Int,alpha:String,nickname:String)
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-common-2.2.0-bin-master\\")
    val sparkconf = new SparkConf().setAppName("ESHive").setMaster("local").set("es.index.auto.create", "true")
    val sc = new SparkContext(sparkconf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val full_csv = sc.parallelize(Array(
      "column1, column2, column3",
      "122290, Giri, Saama",
      "122291, Mathan, Saama1",
      "122292, Venu, Saama2",
      "122293, Rajeev, Cogni",
      "122294, Parthi, Cogni",
      "122295, Parthib, Saama",
      "122296, Gaurav, Infosys",
      "122297, Giri, Saama"
      ))
    val split_csv  = full_csv.map(_.split(","))
    val header = split_csv.first
    //Remove Header
    val csv_data = split_csv.filter(_(0) != header(0))
    val csvDF = csv_data.map(r => csvData(r(0).toInt,r(1),r(2))).toDF()
    csvDF.registerTempTable("csv_samples")
    val query_output = sqlContext.sql("select sno,alpha,nickname from csv_samples").take(10).foreach(println)
  }
}

Job Output:

16/01/07 18:35:01 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1768 bytes result sent to driver
16/01/07 18:35:01 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 45 ms on localhost (1/1)
16/01/07 18:35:01 INFO scheduler.DAGScheduler: ResultStage 1 (take at RemoveHeader.scala:31) finished in 0.046 s
16/01/07 18:35:01 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/01/07 18:35:01 INFO scheduler.DAGScheduler: Job 1 finished: take at RemoveHeader.scala:31, took 0.063745 s
[122290, Giri, Saama]
[122291, Mathan, Saama1]
[122292, Venu, Saama2]
[122293, Rajeev, Cogni]
[122294, Parthi, Cogni]
[122295, Parthib, Saama]
[122296, Gaurav, Infosys]
[122297, Giri, Saama]
16/01/07 18:35:01 INFO spark.SparkContext: Invoking stop() from shutdown hook

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s