Kafka => SparkStreaming => Elastic

In the last netcat => spark streaming => elastic tutorial, you would have seen the data flow from Netcat unix streams to Elastic Search through Spark Streaming.

In this tutorial, I am going to walk you through some basics of Apache Kafka technology and how to make the data movement from/out Kafka. Then we will see how Spark Streaming can consume the data produced by Kafka. Finally we will see how Spark Streaming Process can feed this data to Elastic Search Lively.

What is Kafka:

Apache Kafka is a distributed publish-subscribe messaging system. It was originally developed at LinkedIn Corporation and later on became a part of Apache project. Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.

This url helped me to understand the Kafka better.

Procedure to Install Kafka in Mac OS:

If you are mac/ubuntu/linux/redhat user, you can easily do it using brew install kafka once you have homebrew installed in your machine. For other OS, please review the kafka.apache.org website.

Once it’s installed do a cd /usr/local/Cellar/kafka/0.9.0.1/bin directory and start kafka services as below.

Similarly do a cd /usr/local/Cellar/elasticsearch-2.3.3/bin/ directory and start elasticsearch services as below.

#! /bin/bash

#Provide Kafka, Elastic Installed Location
export kafkaLocation=/usr/local/Cellar/kafka/0.9.0.1/bin
export elasticLocation=/usr/local/Cellar/elasticsearch-2.3.3/bin
export kafkatopicName=wordcounttopic

#Start All Kafka Services
$kafkaLocation/zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties &
$kafkaLocation/kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties &
$elasticLocation/elasticsearch &

echo 'Kafka Elasticsearch Services started'

Start the Zookeeper Services:

./zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties

Screen Shot 2016-06-15 at 2.43.56 PM

Start the Kafka Server as below:

./kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties

Screen Shot 2016-06-16 at 9.09.50 PM

Start the Kafka Producer Topic:

$kafkaLocation/kafka-console-producer --broker-list localhost:9092 --topic $kafkatopicName

I have typed the below words in the console.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Screen Shot 2016-06-16 at 9.12.14 PM

Now, we will see how Spark consumes through in built kafka consumer and sends to Elastic Search.

Spark Program : Write this in IntelliJ:

I have referred the code from databricks documentation and tweaked a bit according to the Elastic Search Indexing.

Once you click Run, you will see the output (wordcount.show()) in the Driver Console as below.

You will see the standard output as below. This continues to display until it receives the data from Kafka.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/16 21:30:43 INFO SparkContext: Running Spark version 1.6.1
16/06/16 21:30:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/16 21:30:45 INFO SecurityManager: Changing view acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: Changing modify acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vgiridatabricks); users with modify permissions: Set(vgiridatabricks)
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriver' on port 49990.
16/06/16 21:30:45 INFO Slf4jLogger: Slf4jLogger started
16/06/16 21:30:45 INFO Remoting: Starting remoting
16/06/16 21:30:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.10.10.111:49991]
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 49991.
16/06/16 21:30:45 INFO SparkEnv: Registering MapOutputTracker
16/06/16 21:30:45 INFO SparkEnv: Registering BlockManagerMaster
16/06/16 21:30:45 INFO DiskBlockManager: Created local directory at /private/var/folders/4q/fjc7vkmn4wn_m4fynrvxk2hc0000gn/T/blockmgr-568204eb-276e-4be7-8b0d-5d451f439eef
16/06/16 21:30:45 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/06/16 21:30:45 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/16 21:30:46 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/06/16 21:30:46 INFO SparkUI: Started SparkUI at http://10.10.10.111:4040
16/06/16 21:30:46 INFO Executor: Starting executor ID driver on host localhost
16/06/16 21:30:46 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49992.
16/06/16 21:30:46 INFO NettyBlockTransferService: Server created on 49992
16/06/16 21:30:46 INFO BlockManagerMaster: Trying to register BlockManager
16/06/16 21:30:46 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49992 with 2.4 GB RAM, BlockManagerId(driver, localhost, 49992)
16/06/16 21:30:46 INFO BlockManagerMaster: Registered BlockManager
16/06/16 21:30:46 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/06/16 21:30:46 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:46 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:46 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
Creating function called to create new StreamingContext
16/06/16 21:30:47 INFO WriteAheadLogManager  for Thread: Recovered 2 write ahead log files from file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/receivedBlockMetadata
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval automatically set to 10000 ms
16/06/16 21:30:47 INFO ForEachDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO StateDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO MappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO FlatMappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO ForEachDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO StateDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO MappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO FlatMappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO FlatMappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO FlatMappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO MappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO MappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO MappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO StateDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO StateDStream: Storage level = StorageLevel(false, true, false, false, 1)
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval = 10000 ms
16/06/16 21:30:47 INFO StateDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO StateDStream: Initialized and validated org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO ForEachDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO ForEachDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO ForEachDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO RecurringTimer: Started timer for JobGenerator at time 1466137848000
16/06/16 21:30:47 INFO JobGenerator: Started JobGenerator at 1466137848000 ms
16/06/16 21:30:47 INFO JobScheduler: Started JobScheduler
16/06/16 21:30:47 INFO StreamingContext: StreamingContext started
16/06/16 21:30:48 INFO StateDStream: Time 1466137846000 ms is invalid as zeroTime is 1466137846000 ms and slideDuration is 2000 ms and difference is 0 ms
16/06/16 21:30:48 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:48 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:48 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
16/06/16 21:30:48 INFO JobScheduler: Added jobs for time 1466137848000 ms
16/06/16 21:30:48 INFO JobGenerator: Checkpointing graph for time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updating checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO JobScheduler: Starting job streaming job 1466137848000 ms.0 from job set of time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updated checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO CheckpointWriter: Submitted checkpoint of time 1466137848000 ms writer queue
16/06/16 21:30:48 INFO CheckpointWriter: Saving checkpoint for time 1466137848000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000'
16/06/16 21:30:48 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000.bk
16/06/16 21:30:48 INFO CheckpointWriter: Checkpoint for time 1466137848000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000', took 4209 bytes and 37 ms
16/06/16 21:30:49 INFO SparkContext: Starting job: show at KafkaSSCtoES.scala:71
16/06/16 21:30:49 INFO DAGScheduler: Registering RDD 2 (map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO DAGScheduler: Got job 0 (show at KafkaSSCtoES.scala:71) with 1 output partitions
16/06/16 21:30:49 INFO DAGScheduler: Final stage: ResultStage 1 (show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.8 KB, free 4.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.5 KB, free 7.3 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:49992 (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 0
16/06/16 21:30:49 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 5 is the same as ending offset skipping wordcounttopic 1
16/06/16 21:30:49 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 90 ms on localhost (1/3)
16/06/16 21:30:49 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16/06/16 21:30:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 11 ms on localhost (2/3)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 2
16/06/16 21:30:49 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9 ms on localhost (3/3)
16/06/16 21:30:49 INFO DAGScheduler: ShuffleMapStage 0 (map at KafkaSSCtoES.scala:63) finished in 0.114 s
16/06/16 21:30:49 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/06/16 21:30:49 INFO DAGScheduler: looking for newly runnable stages
16/06/16 21:30:49 INFO DAGScheduler: running: Set()
16/06/16 21:30:49 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/06/16 21:30:49 INFO DAGScheduler: failed: Set()
16/06/16 21:30:49 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.4 KB, free 16.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.7 KB, free 21.4 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:49992 (size: 4.7 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 1894 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
16/06/16 21:30:49 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/06/16 21:30:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:49992 in memory (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 3 blocks
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
16/06/16 21:30:49 INFO MemoryStore: Block rdd_4_0 stored as bytes in memory (estimated size 4.0 B, free 14.1 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:49992 (size: 4.0 B, free: 2.4 GB)
16/06/16 21:30:50 INFO GenerateUnsafeProjection: Code generated in 175.318511 ms
16/06/16 21:30:50 INFO JobScheduler: Added jobs for time 1466137850000 ms
16/06/16 21:30:50 INFO JobGenerator: Checkpointing graph for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updating checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updated checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO CheckpointWriter: Submitted checkpoint of time 1466137850000 ms writer queue
16/06/16 21:30:50 INFO CheckpointWriter: Saving checkpoint for time 1466137850000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000'
16/06/16 21:30:50 INFO GenerateSafeProjection: Code generated in 12.79301 ms
16/06/16 21:30:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1915 bytes result sent to driver
16/06/16 21:30:50 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000
16/06/16 21:30:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 493 ms on localhost (1/1)
16/06/16 21:30:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/06/16 21:30:50 INFO CheckpointWriter: Checkpoint for time 1466137850000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000', took 4259 bytes and 27 ms
16/06/16 21:30:50 INFO DAGScheduler: ResultStage 1 (show at KafkaSSCtoES.scala:71) finished in 0.496 s
16/06/16 21:30:50 INFO DAGScheduler: Job 0 finished: show at KafkaSSCtoES.scala:71, took 0.865830 s
+----+-----+
|word|count|
+----+-----+
+----+-----+

Now I  hit the Enter in the Kafka Producer Console and checking the Driver Output in IntelliJ.

Screen Shot 2016-06-16 at 9.38.35 PM

Now see the Output in Elastic Search head plugin :

Start the elasticsearch service as below.

$ cd /usr/local/Cellar/elastic*/bin

./elasticsearch

As per the Above program we are creating the Index Name and Type as kafkawordcount/kwc. Below snapshot shows you the head plugin before the execution of Spark Steaming job. You can see no index name called kafkawordcount

Screen Shot 2016-06-16 at 9.40.24 PM

 

See the out put in elastic search head plugin after First Hit in the Kafka Producer Console of the below words.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Screen Shot 2016-06-16 at 9.44.02 PMSee the full output:

Screen Shot 2016-06-16 at 9.45.13 PM

Hope you enjoyed this tutorial.

 

 

Netcat => Spark Streaming => ElasticSearch

In this tutorial, let me walk you through the Words typed in the Unix Netcat Channel are streamed through Spark and written into Elastic Search lively as a word,count in a Elastic Search Index.  Here am assuming Spark to ElasticSearch ingestion automatically creates the index during the ingestion process.

What steps I followed:

  1. Install elasticsearch (brew install elasticsearch)
  2. Install mobz/head plugin (https://github.com/mobz/elasticsearch-head)
  3. Open a Netcat channel in Unix streams and type some words
  4. Launch a spark program with org.apache.spark.streaming._ imported libraries
    1. Read the socketTextStream through spark streaming context via 9999 port
    2. flatmap the words
    3. For Each DStreams RDD => Convert them to Dataframe and Register a Temp Table
    4. Save each Dataframe to Elastic Search Index
  5. Open https://localhost:9200/_plugin/head and keep Refreshing the Page to see the latest word, count columns in a ES Index.

No Index is present:

Screen Shot 2016-06-09 at 10.15.41 PM

Create the Below Program in IntelliJ and Execute the Program:

After the Exeuction in IntelliJ, you will see a Streaming Context launched locally and job keeps running in the Run Console.

Open Netcat and Type some words and Press Enter:

$ nc -lk 9999

Iteration 1:

(23 Documents must be stored)

Screen Shot 2016-06-09 at 10.20.18 PM

Now see the words in Elastic Search with the Index Auto Created.

Screen Shot 2016-06-09 at 10.21.38 PM

Screen Shot 2016-06-09 at 10.22.11 PM

Iteration 2:

Enter some other words and Press Enter

Screen Shot 2016-06-09 at 10.23.40 PM.png

Now, see it in the Elastic Search Index:

You can see document count increased to 49 as below.

Screen Shot 2016-06-09 at 10.24.38 PM

Screen Shot 2016-06-09 at 10.25.31 PM

This was all possible with the magic of Spark+DataFrame+ES APIs. Same way, you can write it into Cassandra, Mongo, Hbase, Oracle, mysql or any destination databases or systems.

Now you can stop the spark streaming job. Hope you Enjoyed this tutorial.

 

 

How to Fix Elastic Search Spark SQL Exception “org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field not found; typically this occurs with arrays which are not mapped as single value”

The Actual ElasticSearch Index Data is nested as below.

|– VendorInfo: struct (nullable = true)
|    |– VendorAddress: string (nullable = true)
|    |– VendorAlternativeEmail: string (nullable = true)
|    |– VendorCellphone: string (nullable = true)
|    |– VendorDescription: string (nullable = true)
|    |– VendorEmail: string (nullable = true)
|    |– VendorHomePhone: string (nullable = true)
|    |– VendorName: string (nullable = true)
|    |– VendorRole: string (nullable = true)
|    |– VendorTaxId: string (nullable = true)
|    |– VendorWorkPhone: string (nullable = true)

I tried to read the VendorInfo value through Spark SQL DataFrames as below.
val esConfig = Map((“es.nodes”,”localhost”),(“es.port”,”9200″),(“es.index.auto.create”,”false”),(“es.http.timeout”, “5m”))

val readEsIndex = sqlContext.read.format(“org.elasticsearch.spark.sql”).options(esConfig).load(“indexname/type”)
readEsIndex.registerTempTable(“readEsIndex”)
val hivedf = sqlContext.sql(“select VendorInfo from readEsIndex”)

I got Exception as below:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 40, n01bdl603.aap.csaa.pri): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field ‘VendorTaxId’ not found; typically this occurs with arrays which are not mapped as single value.

How did I resolve this?
1.Added “es.read.field.array” configuration in the esConfig

2.Added the below version of ElasticSearch Spark package
libraryDependencies += “org.elasticsearch” % “elasticsearch-spark_2.10” % “2.2.0”

3.Added below imports
import org.elasticsearch.hadoop.cfg.ConfigurationOptions._
import org.elasticsearch.hadoop.cfg.PropertiesSettings
import org.elasticsearch.hadoop.util.StringUtils
import org.elasticsearch.spark.cfg._
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL
import org.elasticsearch.spark.sql.sqlContextFunctions
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException
import org.apache.spark.sql.types.DoubleType

4.Re-tried as below:
val esConfig = Map((“es.nodes”,”localhost”),(“es.port”,”9200″),(“es.index.auto.create”,”false”),(“es.http.timeout”, “5m”),(“es.read.field.as.array”,”VendorInfo.VendorAddress,VendorInfo.VendorAlternativeEmail,VendorInfo.VendorCellphone,VendorInfo.VendorDescription,VendorInfo.VendorEmail,VendorInfo.VendorHomePhone,VendorInfo.VendorName,VendorInfo.VendorRole,VendorInfo.VendorTaxId,VendorInfo.VendorWorkPhone”))

val readEsIndex = sqlContext.read.format(“org.elasticsearch.spark.sql”).options(esConfig).load(“indexname/type”)
readEsIndex.registerTempTable(“readEsIndex”)
val hivedf = sqlContext.sql(“select VendorInfo from readEsIndex”)

It worked Fine and returned the Records.

How to Parse a CSV File in Spark using DataFrames [or] CSV to Elastic Search through Spark

I downloaded a sample CSV File from this site CSV Downloads. Below is the Spark Program in Scala I have created to parse the CSV File and Load it into the Elastic Search Index.

DOwnloaded File : Real Estate Data CSV

Steps:
1. Create Schema for the CSV File that you are going to Load
2. Create a Singleton Object to have Spark-csv API from databricks and Org.elasticsearch.spark.sql._ APIs
3. Provide Configuration for Elastic Search
4. Load to ES through ES-Spark APIs

Program:
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._

/**
 * Created by Varatharajan Giri Ramanathan on 2/4/2016.
 */
object CsvToEsLoadDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("EsIndexDataLoad")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val csvSplit = "street,city,zip,state,beds,baths,sq_ft,type,sale_date,price,latitude,longitude"
    val csvSchema = schemaDef(csvSplit)
    val esConfig = Map(("es.nodes","localhost"),("es.port","9200"),("es.index.auto.create","true"),("es.http.timeout", "5m"))
    val esdf = sqlContext.read.format("com.databricks.spark.csv")
      .option("header","false")
      .option("inferSchema", "false")
      .schema(csvSchema)
      .load("C:\\Users\\gfp2ram\\Desktop\\Sacramentorealestatetransactions.csv")
    //val esdf = sqlContext.read.format("com.databricks.spark.csv").option("header","false").option("inferSchema", "false").schema(rfSchema).load("/user/horf/horf_flag_rules.csv")
    esdf.saveToEs("realestatedata/data",cfg = esConfig)
  }
  def schemaDef(baseSchema:String) : StructType = {
    return StructType(baseSchema.split(",").map(f => StructField(f, StringType, true)))
  }
}

I created a sbt package and used below command to execute the Program.

D:\Spark\spark-1.5.0-bin-hadoop2.6\bin>spark-submit --packages com.databricks:spark-csv_2.10:1.3.0 --jars C:\\Users\\gfp2ram\\.ivy2\\cache\\org.elasticsearch\\elasticsearch-spark_2.10\\jars\\elasticsearch-spark_2.10-2.1.1.jar --class CsvToEsLoadDemo --master local[*] --driver-memory 500m --executor-
memory 2g D:\\RnD\\Workspace\\scala\\SparkLatest\\target\\scala-2.10\\sparklatest_2.10-1.0.jar C:\\Users\\gfp2ram\\Desktop\\Sacramentorealestatetransactions.csv localhost 9200 realestatedata/data

args(0) --> Input CSV File
args(1) --> Es Host name
args(2) --> Es Node Name
args(3) --> IndexName/Type

If you have built build.sbt as below, you can use Run option from your IDE directly.

Output:

This Program loaded around 990 records in 10 seconds using local[*] cores. This is great right :)



spark-csv


					

How to Read ElasticSearch Index and Load the Data into Hive Table Through Spark SQL DataFrames?

Probably you would have visited my below post on ES-Hive Integration. My earlier Post on Creating a Hive Table by Reading Elastic Search Index thorugh Hive Queries

Let’s see here how to read the Data loaded in a Elastic Search Index through Spark SQL DataFrames and Load the data into a Hive Table. In Production Environments, we can maintain the data syncup betwen Elastic Search and Hive in this way. Please read through this. I have explained this in few steps as below.

After reading this, you will realize how easily we can sync ES and Hive data through Spark Data frame APIs without having much Schema references to load the data.

Step 1: Create a ES Index

Step 2: Add Fields Mapping to the ES Index

Step 3: Load 10 records to the Index

Step 4: Write a Scala Code to perform ES Hive Processing 

Step 5: Display the record in the Hive Table.

Step 1: Create a ES Index. I have created players as a Index name and cricket as a index type

PUT /players
{
 "settings": {
 "number_of_shards": 1,
 "analysis": {
 "filter": {
 "autocomplete_filter": {
 "type": "edge_ngram",
 "min_gram": 1,
 "max_gram": 30
 }
 },
 "analyzer": {
 "autocomplete": {
 "type": "custom",
 "tokenizer": "standard",
 "filter": [
 "lowercase",
 "autocomplete_filter"
 ]
 },
 "string_lowercase": {
 "tokenizer": "keyword",
 "filter": "lowercase"
 }
 }
 }
 }
}

Step 2:Add Fields Mapping to the ES Index

PUT /players/cricket/_mapping
{
 "cricket": {
 "properties": {
 "playerId": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerFirstName": {
 "type": "string"
 },
 "playerLastName": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerAverage": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerCountry": {
 "type": "string",
 "index": "not_analyzed"
 },
 "playerType": {
 "type": "string"
 },
 "playerBest": {
 "type": "string"
 }
 }
 }
}

Step 3: Load Around 10 Records as below to the ES Index

PUT /players/cricket/1
{
 "playerId": "1000",
 "playerFirstName": "Sachin",
 "playerLastName": "Tendulkar",
 "playerAverage": "70.4",
 "playerCountry": "India",
 "playerType": "Batsman",
 "playerBest": "205"
}
PUT /players/cricket/2
{
 "playerId": "1001",
 "playerFirstName": "Jonty",
 "playerLastName": "Rhodes",
 "playerAverage": "50",
 "playerCountry": "South Africa",
 "playerType": "Best Filder",
 "playerBest": "121"
}
PUT /players/cricket/3
{
 "playerId": "1002",
 "playerFirstName": "Giri",
 "playerLastName": "Varatharajan",
 "playerAverage": "99",
 "playerCountry": "India",
 "playerType": "BigDataCricketer",
 "playerBest": "400"
}
PUT /players/cricket/4
{
 "playerId": "1003",
 "playerFirstName": "Malaikannan",
 "playerLastName": "Sankara",
 "playerAverage": "110",
 "playerCountry": "India",
 "playerType": "PredictiveModelCricketer",
 "playerBest": "399"
}
PUT /players/cricket/5
{
 "playerId": "1004",
 "playerFirstName": "Mathan",
 "playerLastName": "Pillai",
 "playerAverage": "99.9",
 "playerCountry": "India",
 "playerType": "ProdSupportCricketer",
 "playerBest": "InfiniteValue"
}
PUT /players/cricket/6
{
 "playerId": "1005",
 "playerFirstName": "Singh",
 "playerLastName": "Jaswinder",
 "playerAverage": "99.7",
 "playerCountry": "India",
 "playerType": "SaamaLeadCricketer",
 "playerBest": "199"
}
PUT /players/cricket/7
{
 "playerId": "1006",
 "playerFirstName": "Abhishek",
 "playerLastName": "Peraka",
 "playerAverage": "599",
 "playerCountry": "India",
 "playerType": "BigDataDirectCricketer",
 "playerBest": "299"
}
PUT /players/cricket/8
{
 "playerId": "1007",
 "playerFirstName": "Parthiban",
 "playerLastName": "Part1",
 "playerAverage": "199",
 "playerCountry": "India",
 "playerType": "BigData10yrsCricketer",
 "playerBest": "99.99"
}
PUT /players/cricket/9
{
 "playerId": "1008",
 "playerFirstName": "Rajeev",
 "playerLastName": "Thanga",
 "playerAverage": "189.99",
 "playerCountry": "India",
 "playerType": "BigData1QACricketer",
 "playerBest": "99.100"
}
PUT /players/cricket/10
{
 "playerId": "1009",
 "playerFirstName": "Robin",
 "playerLastName": "Singh",
 "playerAverage": "199.99",
 "playerCountry": "India",
 "playerType": "LeftHand All Rounder",
 "playerBest": "100"
}

After this I verified the data is present in the Index through the Head Plugin

playersESIndexSnapshot

Step 4: Spark SQL DataFrame Code for processing ES to Hive Integration

/*Below APIs are based on Spark 1.4.1 */
scala> val esConfig = Map("pushdown" -> "true", "es.nodes" -> "localhost", "es.port" -> "9200")
scala> val readPlayerESindex = sqlContext.read.format("org.elasticsearch.spark.sql").options(esConfig).load("players/cricket")
scala> readPlayerESindex.printSchema()
/*
root
 |-- playerAverage: string (nullable = true)
 |-- playerBest: string (nullable = true)
 |-- playerCountry: string (nullable = true)
 |-- playerFirstName: string (nullable = true)
 |-- playerId: string (nullable = true)
 |-- playerLastName: string (nullable = true)
 |-- playerType: string (nullable = true) */
scala> readPlayerESindex.take(10).foreach(println)
/*
16/01/30 19:09:08 INFO DAGScheduler: ResultStage 0 (take at <console>:24) finished in 1.210 s
16/01/30 19:09:08 INFO DAGScheduler: Job 0 finished: take at <console>:24, took 1.415645 s
[70.4,205,India,Sachin,1000,Tendulkar,Batsman]
[50,121,South Africa,Jonty,1001,Rhodes,Best Filder]
[99,400,India,Giri,1002,Varatharajan,BigDataCricketer]
[110,399,India,Malaikannan,1003,Sankara,PredictiveModelCricketer]
[99.9,InfiniteValue,India,Mathan,1004,Pillai,ProdSupportCricketer]
[99.7,199,India,Singh,1005,Jaswinder,SaamaLeadCricketer]
[599,299,India,Abhishek,1006,Peraka,BigDataDirectCricketer]
[199,99.99,India,Parthiban,1007,Part1,BigData10yrsCricketer]
[189.99,99.100,India,Rajeev,1008,Thanga,BigData1QACricketer]
[199.99,100,India,Robin,1009,Singh,LeftHand All Rounder] */
scala> readPlayerESindex.registerTempTable("players")
scala> sqlContext.sql("create table imdp.playersESHiveParq (playerAverage string,playerBest string,playerCountry string,playerFirstName string,playerId string,playerLastName string,playerType string) row format delimited fields terminated by ',' stored as parquet")
/*
res3: org.apache.spark.sql.DataFrame = [result: string] */
//I got into Permission Issues. So, I handled in a different way below. But If I would have had proper permission to create a table, I can simply use the below way
//sqlContext.sql("insert overwrite table imdp.playersESHiveParq select playerAverage ,playerBest ,playerCountry ,playerFirstName ,playerId ,playerLastName ,playerType from players")
//Caused by: MetaException(message:java.security.AccessControlException: Permission denied: user=hive, access=WRITE, inode="/apps/hive/warehouse/playerseshive":xgfp2ram:hdfs:drwxr-xr-x
//Another Easiest Way to Store the ES Data In Hive Table. But you should have created the table Before performing this API.
//readPlayerESindex.saveAsTable("imdp.playersESHiveParq")
scala> readPlayerESindex.write.parquet("/user/horf/playerESHiveParq")
scala> sqlContext.sql("load data inpath '/user/horf/playerESHiveParq' into table imdp.playersESHiveParq")

Step 5: View the Final data through the Hive Table

scala> sqlContext.sql(“select * from imdp.playersESHiveParq”).take(10).foreach(println)
/*Output from Hive Table
[70.4,205,India,Sachin,1000,Tendulkar,Batsman]
[50,121,South Africa,Jonty,1001,Rhodes,Best Filder]
[99,400,India,Giri,1002,Varatharajan,BigDataCricketer]
[110,399,India,Malaikannan,1003,Sankara,PredictiveModelCricketer]
[99.9,InfiniteValue,India,Mathan,1004,Pillai,ProdSupportCricketer]
[99.7,199,India,Singh,1005,Jaswinder,SaamaLeadCricketer]
[599,299,India,Abhishek,1006,Peraka,BigDataDirectCricketer]
[199,99.99,India,Parthiban,1007,Part1,BigData10yrsCricketer]
[189.99,99.100,India,Rajeev,1008,Thanga,BigData1QACricketer]
[199.99,100,India,Robin,1009,Singh,LeftHand All Rounder] */

/*Use this type of Spark-shell command

spark-shell –master yarn-client –jars /app/data/workspace/elasticsearch-spark_2.10-2.1.1.jar –executor-cores 1 –executor-memory 15g –driver-memory 20g –queue default –num-executors 30

*/

Download the Jar File through mvnrepository or use the below build.sbt file

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.5.2",
  "org.apache.spark" % "spark-sql_2.10" % "1.5.2",
  "org.apache.hadoop" % "hadoop-common" % "2.6.0",
  "org.apache.spark" % "spark-sql_2.10" % "1.5.2",
  "org.apache.spark" % "spark-hive_2.10" % "1.5.2",
  "org.apache.spark" % "spark-yarn_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.2",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.2",
  "com.fasterxml.jackson.core" % "jackson-core" % "2.6.1",
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.1",
  "org.elasticsearch" % "elasticsearch" % "1.7.2",
  "net.sf.opencsv" % "opencsv" % "2.3",
  "org.apache.hadoop" % "hadoop-streaming" % "1.0.4"
)


How to Ingest Data from Spark to ElasticSearch Index?

I was trying to do a quick POC on data ingestion from Spark to Elastic Search Nodes. Finally i was able to achieve the results. The below program ingested 829k records in < 3 mins using Local Memory Configuration. Please see the program below. This program automatically creates the Index in Elastic Search and does the dynamic mapping by default.

For Elastic Search node configuration, please refer my another blog here.

Sample Data to be Indexed:

Giri Varatharajan 22
Varatharajan Giri 22
Mathan Pillai 22
Keerthi Vaasan 20
Rajeev Thanga 25
Parthi Saama 22
Parthi Cogni 25
Mannar Samy 35
Gaurav Prasad 40
Spark SQL 150
Spark Streaming 150

Scala Code:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

/**
 * Created by Giri R Varatharajan on 9/18/2015.
 */
object ElasticSearch {
  def main(args: Array[String]) {
    System.setProperty(“hadoop.home.dir”, “D:\\hadoop\\hadoop-common-2.2.0-bin-master\\”)
    val sparkconf = new SparkConf().setAppName(“ElasticSearch”).setMaster(“local”).set(“es.index.auto.create”, “true”)
      .set(“es.nodes”, “192.168.56.102”).set(“es.port”,”9201″).set(“es.http.timeout”,”5m”).set(“es.scroll.size”,”50″)
    val sc = new SparkContext(sparkconf)

//Simple Index Creation with Sample data
//    val numbers = Map(“one” -> 1, “two” -> 2, “three” -> 3)
//    val airports = Map(“arrival” -> “Otopeni”, “SFO” -> “San Fran”)
//    sc.makeRDD(Seq(numbers,airports)).saveToEs(“spark_example1/docs”)

//Simple Index Creation with Sample data present in a File
    case class Person(name: String, surname: String, age: Int)
    val people = sc.textFile(“D:\\RnD\\Workspace\\scala\\TestSpark\\testData\\peoplefile”).map(_.split(” “)).map(p => Person(p(0), p(1), p(2).trim.toInt))
    people.saveToEs(“spark_example1/people”)

//Connect to ES through Connection Parameters Provided Separately in a Configuration value.

//Spark 1.3 Style
//    val ESConfigOptions = Map(“pushdown” -> “true”, “es.nodes” -> “192.168.56.102”, “es.port” -> “9201”)
//    val sqlc = new SQLContext(sc)
//    val dataFrameLayout = sqlc.read.format(“org.elasticsearch.spark.sql”).options(ESConfigOptions).load(“spark_example1/people”)
//    dataFrameLayout.registerTempTable(“people”)
//    sqlc.sql(“select * from people”).collect().foreach(println)

//Primary Option to Read and Display ES Index and Data Present into It

//Spark 1.3 Style
//      val sqlc = new SQLContext(sc)
//      val sarDF = sqlc.esDF(“spark_example3/people”)
//      sarDF.registerTempTable(“people”)
//      sqlc.sql(“select * from people”).collect().foreach(println)
}
}

Highlighted ones Actually does the Ingestion. Remaining commented code I have given is for your reference.

image003

Please refer the spark_example1/people Index in the image.

Enterprise Data Search (Lucene API, Java Example)

Good Write up on the Lucene and Other Basics Search details…

Gaurav Prasad

Search ??

Every time it comes to my mind why this word is given so much emphasis. But I got to know the actual reason when I moved to Big Data world. When one has the data with a volume size in terabyte or petabyte, it’s important to have efficient search mechanism, as no one wants to wait to get a search result after a day or two.

Say if you search something over Google and it says, your search request is accepted, we will return the result within five working days, how many of you will like to use Google. I am sure software developer and programmer will stop using it on the spot.

So do we use Google search only because of response time?
If anyone asks me, my answer will be NO !!!

Google is the most used search engine because it has many search feature with…

View original post 2,330 more words