Kafka => SparkStreaming => Elastic

In the last netcat => spark streaming => elastic tutorial, you would have seen the data flow from Netcat unix streams to Elastic Search through Spark Streaming.

In this tutorial, I am going to walk you through some basics of Apache Kafka technology and how to make the data movement from/out Kafka. Then we will see how Spark Streaming can consume the data produced by Kafka. Finally we will see how Spark Streaming Process can feed this data to Elastic Search Lively.

What is Kafka:

Apache Kafka is a distributed publish-subscribe messaging system. It was originally developed at LinkedIn Corporation and later on became a part of Apache project. Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.

This url helped me to understand the Kafka better.

Procedure to Install Kafka in Mac OS:

If you are mac/ubuntu/linux/redhat user, you can easily do it using brew install kafka once you have homebrew installed in your machine. For other OS, please review the kafka.apache.org website.

Once it’s installed do a cd /usr/local/Cellar/kafka/0.9.0.1/bin directory and start kafka services as below.

Similarly do a cd /usr/local/Cellar/elasticsearch-2.3.3/bin/ directory and start elasticsearch services as below.

#! /bin/bash

#Provide Kafka, Elastic Installed Location
export kafkaLocation=/usr/local/Cellar/kafka/0.9.0.1/bin
export elasticLocation=/usr/local/Cellar/elasticsearch-2.3.3/bin
export kafkatopicName=wordcounttopic

#Start All Kafka Services
$kafkaLocation/zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties &
$kafkaLocation/kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties &
$elasticLocation/elasticsearch &

echo 'Kafka Elasticsearch Services started'

Start the Zookeeper Services:

./zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties

Screen Shot 2016-06-15 at 2.43.56 PM

Start the Kafka Server as below:

./kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties

Screen Shot 2016-06-16 at 9.09.50 PM

Start the Kafka Producer Topic:

$kafkaLocation/kafka-console-producer --broker-list localhost:9092 --topic $kafkatopicName

I have typed the below words in the console.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Screen Shot 2016-06-16 at 9.12.14 PM

Now, we will see how Spark consumes through in built kafka consumer and sends to Elastic Search.

Spark Program : Write this in IntelliJ:

I have referred the code from databricks documentation and tweaked a bit according to the Elastic Search Indexing.

Once you click Run, you will see the output (wordcount.show()) in the Driver Console as below.

You will see the standard output as below. This continues to display until it receives the data from Kafka.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/16 21:30:43 INFO SparkContext: Running Spark version 1.6.1
16/06/16 21:30:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/16 21:30:45 INFO SecurityManager: Changing view acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: Changing modify acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vgiridatabricks); users with modify permissions: Set(vgiridatabricks)
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriver' on port 49990.
16/06/16 21:30:45 INFO Slf4jLogger: Slf4jLogger started
16/06/16 21:30:45 INFO Remoting: Starting remoting
16/06/16 21:30:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.10.10.111:49991]
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 49991.
16/06/16 21:30:45 INFO SparkEnv: Registering MapOutputTracker
16/06/16 21:30:45 INFO SparkEnv: Registering BlockManagerMaster
16/06/16 21:30:45 INFO DiskBlockManager: Created local directory at /private/var/folders/4q/fjc7vkmn4wn_m4fynrvxk2hc0000gn/T/blockmgr-568204eb-276e-4be7-8b0d-5d451f439eef
16/06/16 21:30:45 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/06/16 21:30:45 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/16 21:30:46 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/06/16 21:30:46 INFO SparkUI: Started SparkUI at http://10.10.10.111:4040
16/06/16 21:30:46 INFO Executor: Starting executor ID driver on host localhost
16/06/16 21:30:46 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49992.
16/06/16 21:30:46 INFO NettyBlockTransferService: Server created on 49992
16/06/16 21:30:46 INFO BlockManagerMaster: Trying to register BlockManager
16/06/16 21:30:46 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49992 with 2.4 GB RAM, BlockManagerId(driver, localhost, 49992)
16/06/16 21:30:46 INFO BlockManagerMaster: Registered BlockManager
16/06/16 21:30:46 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/06/16 21:30:46 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:46 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:46 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
Creating function called to create new StreamingContext
16/06/16 21:30:47 INFO WriteAheadLogManager  for Thread: Recovered 2 write ahead log files from file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/receivedBlockMetadata
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval automatically set to 10000 ms
16/06/16 21:30:47 INFO ForEachDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO StateDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO MappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO FlatMappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO ForEachDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO StateDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO MappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO FlatMappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO FlatMappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO FlatMappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO MappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO MappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO MappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO StateDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO StateDStream: Storage level = StorageLevel(false, true, false, false, 1)
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval = 10000 ms
16/06/16 21:30:47 INFO StateDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO StateDStream: Initialized and validated org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO ForEachDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO ForEachDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO ForEachDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO RecurringTimer: Started timer for JobGenerator at time 1466137848000
16/06/16 21:30:47 INFO JobGenerator: Started JobGenerator at 1466137848000 ms
16/06/16 21:30:47 INFO JobScheduler: Started JobScheduler
16/06/16 21:30:47 INFO StreamingContext: StreamingContext started
16/06/16 21:30:48 INFO StateDStream: Time 1466137846000 ms is invalid as zeroTime is 1466137846000 ms and slideDuration is 2000 ms and difference is 0 ms
16/06/16 21:30:48 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:48 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:48 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
16/06/16 21:30:48 INFO JobScheduler: Added jobs for time 1466137848000 ms
16/06/16 21:30:48 INFO JobGenerator: Checkpointing graph for time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updating checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO JobScheduler: Starting job streaming job 1466137848000 ms.0 from job set of time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updated checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO CheckpointWriter: Submitted checkpoint of time 1466137848000 ms writer queue
16/06/16 21:30:48 INFO CheckpointWriter: Saving checkpoint for time 1466137848000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000'
16/06/16 21:30:48 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000.bk
16/06/16 21:30:48 INFO CheckpointWriter: Checkpoint for time 1466137848000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000', took 4209 bytes and 37 ms
16/06/16 21:30:49 INFO SparkContext: Starting job: show at KafkaSSCtoES.scala:71
16/06/16 21:30:49 INFO DAGScheduler: Registering RDD 2 (map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO DAGScheduler: Got job 0 (show at KafkaSSCtoES.scala:71) with 1 output partitions
16/06/16 21:30:49 INFO DAGScheduler: Final stage: ResultStage 1 (show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.8 KB, free 4.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.5 KB, free 7.3 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:49992 (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 0
16/06/16 21:30:49 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 5 is the same as ending offset skipping wordcounttopic 1
16/06/16 21:30:49 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 90 ms on localhost (1/3)
16/06/16 21:30:49 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16/06/16 21:30:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 11 ms on localhost (2/3)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 2
16/06/16 21:30:49 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9 ms on localhost (3/3)
16/06/16 21:30:49 INFO DAGScheduler: ShuffleMapStage 0 (map at KafkaSSCtoES.scala:63) finished in 0.114 s
16/06/16 21:30:49 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/06/16 21:30:49 INFO DAGScheduler: looking for newly runnable stages
16/06/16 21:30:49 INFO DAGScheduler: running: Set()
16/06/16 21:30:49 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/06/16 21:30:49 INFO DAGScheduler: failed: Set()
16/06/16 21:30:49 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.4 KB, free 16.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.7 KB, free 21.4 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:49992 (size: 4.7 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 1894 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
16/06/16 21:30:49 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/06/16 21:30:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:49992 in memory (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 3 blocks
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
16/06/16 21:30:49 INFO MemoryStore: Block rdd_4_0 stored as bytes in memory (estimated size 4.0 B, free 14.1 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:49992 (size: 4.0 B, free: 2.4 GB)
16/06/16 21:30:50 INFO GenerateUnsafeProjection: Code generated in 175.318511 ms
16/06/16 21:30:50 INFO JobScheduler: Added jobs for time 1466137850000 ms
16/06/16 21:30:50 INFO JobGenerator: Checkpointing graph for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updating checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updated checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO CheckpointWriter: Submitted checkpoint of time 1466137850000 ms writer queue
16/06/16 21:30:50 INFO CheckpointWriter: Saving checkpoint for time 1466137850000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000'
16/06/16 21:30:50 INFO GenerateSafeProjection: Code generated in 12.79301 ms
16/06/16 21:30:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1915 bytes result sent to driver
16/06/16 21:30:50 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000
16/06/16 21:30:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 493 ms on localhost (1/1)
16/06/16 21:30:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/06/16 21:30:50 INFO CheckpointWriter: Checkpoint for time 1466137850000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000', took 4259 bytes and 27 ms
16/06/16 21:30:50 INFO DAGScheduler: ResultStage 1 (show at KafkaSSCtoES.scala:71) finished in 0.496 s
16/06/16 21:30:50 INFO DAGScheduler: Job 0 finished: show at KafkaSSCtoES.scala:71, took 0.865830 s
+----+-----+
|word|count|
+----+-----+
+----+-----+

Now I  hit the Enter in the Kafka Producer Console and checking the Driver Output in IntelliJ.

Screen Shot 2016-06-16 at 9.38.35 PM

Now see the Output in Elastic Search head plugin :

Start the elasticsearch service as below.

$ cd /usr/local/Cellar/elastic*/bin

./elasticsearch

As per the Above program we are creating the Index Name and Type as kafkawordcount/kwc. Below snapshot shows you the head plugin before the execution of Spark Steaming job. You can see no index name called kafkawordcount

Screen Shot 2016-06-16 at 9.40.24 PM

 

See the out put in elastic search head plugin after First Hit in the Kafka Producer Console of the below words.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Screen Shot 2016-06-16 at 9.44.02 PMSee the full output:

Screen Shot 2016-06-16 at 9.45.13 PM

Hope you enjoyed this tutorial.

 

 

Netcat => Spark Streaming => ElasticSearch

In this tutorial, let me walk you through the Words typed in the Unix Netcat Channel are streamed through Spark and written into Elastic Search lively as a word,count in a Elastic Search Index.  Here am assuming Spark to ElasticSearch ingestion automatically creates the index during the ingestion process.

What steps I followed:

  1. Install elasticsearch (brew install elasticsearch)
  2. Install mobz/head plugin (https://github.com/mobz/elasticsearch-head)
  3. Open a Netcat channel in Unix streams and type some words
  4. Launch a spark program with org.apache.spark.streaming._ imported libraries
    1. Read the socketTextStream through spark streaming context via 9999 port
    2. flatmap the words
    3. For Each DStreams RDD => Convert them to Dataframe and Register a Temp Table
    4. Save each Dataframe to Elastic Search Index
  5. Open https://localhost:9200/_plugin/head and keep Refreshing the Page to see the latest word, count columns in a ES Index.

No Index is present:

Screen Shot 2016-06-09 at 10.15.41 PM

Create the Below Program in IntelliJ and Execute the Program:

After the Exeuction in IntelliJ, you will see a Streaming Context launched locally and job keeps running in the Run Console.

Open Netcat and Type some words and Press Enter:

$ nc -lk 9999

Iteration 1:

(23 Documents must be stored)

Screen Shot 2016-06-09 at 10.20.18 PM

Now see the words in Elastic Search with the Index Auto Created.

Screen Shot 2016-06-09 at 10.21.38 PM

Screen Shot 2016-06-09 at 10.22.11 PM

Iteration 2:

Enter some other words and Press Enter

Screen Shot 2016-06-09 at 10.23.40 PM.png

Now, see it in the Elastic Search Index:

You can see document count increased to 49 as below.

Screen Shot 2016-06-09 at 10.24.38 PM

Screen Shot 2016-06-09 at 10.25.31 PM

This was all possible with the magic of Spark+DataFrame+ES APIs. Same way, you can write it into Cassandra, Mongo, Hbase, Oracle, mysql or any destination databases or systems.

Now you can stop the spark streaming job. Hope you Enjoyed this tutorial.

 

 

How to Transpose Columns to Rows in Spark Dataframe

Let’s say you have input like this.

Screen Shot 2016-05-28 at 7.21.38 PM

and you want the Output Like as below. Meaning  all these columns have to be transposed to Rows using Spark DataFrame approach. As you know, there is no direct way to do the transpose in Spark. Some cases we can use Pivot. But I haven’t tried that part yet.

Screen Shot 2016-05-28 at 7.22.33 PM

What I have I done?

  1. Created a Sequence and converted that to Dataframe with 3 column names assigned
  2. Used collect function to combine all the columns into an array list
  3. Splitted the arraylist using a custom delimiter (‘:’)
  4. Read each element of the arraylist and outputted as a seperate column in a sql. Similary did for all columns
  5. Union all All converted columns and created a final dataframe.
  6. I have used Spark SQL approach here.
  7. Please note that this approach only work for small set of Columns. There could be some easiest ways also. If you know, please reply in a comment here.

IntelliJ Version of the Code:

This example, I tried using Spark 1.6.1 version

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by vgiridatabricks on 5/28/16.
  */
object SparkTranspose {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName("csvParser")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._
    val df = sc.parallelize(Seq(("Titanic","IronMan","JungleBook"),("1","10","20"),("2","30","40"),("1","30","20"))).toDF("a","b","c")
 df.registerTempTable("transpose")
 sqlContext.sql("select concat_ws(':',a.cola) as cola_new,concat_ws(':',a.colb) as colb_new,concat_ws(':',a.colc) as colc_new from (select collect_list(a) as cola, collect_list(b) as colb,collect_list(c) as colc from transpose)a").registerTempTable("tablesplitter")
 sqlContext.sql(
 """
 select split(cola_new,':') [0] as col1,split(cola_new,':') [1] as col2,split(cola_new,':') [2] as col3,split(cola_new,':') [3] as col4 from tablesplitter
 union all
 select split(colb_new,':') [0] as col1,split(colb_new,':') [1] as col2,split(colb_new,':') [2] as col3,split(colb_new,':') [3] as col4 from tablesplitter
 union all
 select split(colc_new,':') [0] as col1,split(colc_new,':') [1] as col2,split(colc_new,':') [2] as col3,split(colc_new,':') [3] as col4 from tablesplitter
 """)
 .show()

 }

}

Output:

Screen Shot 2016-05-28 at 12.34.54 PM

Hope you Enjoyed this Blog!..

Generate Unique IDs for Each Rows in a Spark Dataframe

Let’s see how to create Unique IDs for each of the rows present in a Spark DataFrame.

Steps to produce this:

Option 1 => Using MontotonicallyIncreasingID or ZipWithUniqueId methods

  1. Create a Dataframe from a parallel collection
  2. Apply a spark dataframe method to generate Unique Ids Monotonically Increasing
import org.apache.spark.sql.functions._ 
val df = sc.parallelize(Seq(("Databricks", 20000), ("Spark", 100000), ("Hadoop", 3000))).toDF("word", "count") 
df.withColumn("uniqueID",monotonicallyIncreasingId).show()

Screen Shot 2016-05-23 at 4.13.37 PM

import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(("Databricks", 20000), ("Spark", 100000), ("Hadoop", 3000))).toDF("word", "count")
val wcschema = df.schema
val inputRows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
val wcID = sqlContext.createDataFrame(inputRows, StructType(StructField("id", LongType, false) +: wcschema.fields))

Screen Shot 2016-05-23 at 4.13.46 PM

Option 2 => Use Row_Number Function

With PartitionBy Column:

val df = sc.parallelize(Seq(("Databricks", 20000), ("Spark", 100000), ("Hadoop", 3000))).toDF("word", "count")
df.createOrReplaceTempView("wordcount")
val tmpTable = sqlContext.sql("select row_number() over (partition by word order by count) as rnk,word,count from wordcount")
tmpTable.show()

Screen Shot 2016-05-23 at 8.12.15 PM

Without PartitionBy Column:

val tmpTable1 = sqlContext.sql("select row_number() over (order by count) as rnk,word,count from wordcount")
tmpTable1.show()

Screen Shot 2016-05-23 at 8.13.09 PM

 

How to Connect Cassandra and Spark with Guava Shade dependency using SBT

What is Cassandra?

Cassandra is a distributed database for managing large amounts of structured data across many commodity servers, while providing highly available service and no single point of failure.  Cassandra offers capabilities that relational databases and other NoSQL databases simply cannot match such as: continuous availability, linear scale performance, operational simplicity and easy data distribution across multiple data centers and cloud availability zones.
Why Cassandra?
Do you need a more flexible data model than what’s offered in the relational database world? Would you like to start with a database you know can scale to meet any number of concurrent user connections and/or data volume size and run blazingly fast? Have you been needing a database that has no single point of failure and one that can easily distribute data among multiple geographies, data centers, and the cloud? Well, that’s Cassandra.

Installing in MacOS:

$ brew install cassandra

$ cd /usr/local/Cellar/cassandra/3.5/bin

$ ./cqlsh

Screen Shot 2016-05-21 at 11.26.16 PM

Let’s connect this table from Spark and read as RDD and DataFrame:

import com.datastax.spark.connector._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by vgiridatabricks on 5/21/16.
 */
 object SparkCassandra {
 def main(args: Array[String]): Unit = {
 val conf = new SparkConf(true)
 .set("spark.cassandra.connection.host", "127.0.0.1")
 // .set("spark.cassandra.auth.username", "username")
 // .set("spark.cassandra.auth.password", "password")
 .setMaster("local").setAppName("SparkCassandra")
 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)

//Just test Spark Cassandra Connectivity
 val tableConn = sc.cassandraTable("sparktestcassandra", "emp")
 print(tableConn)

//Read from Cassandra Table as a DataFrame
 val df = sqlContext
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" -> "emp", "keyspace" -> "sparktestcassandra"))
 .load()
 df.show()
 }
 }

df.show() — > See the output in IntelliJ

Screen Shot 2016-05-21 at 11.28.15 PM

Writing RDD to Cassandra:

Just create a table to get the data saved in a Cassandra Table.

cqlsh:sparktestcassandra> create table wordcount (word varchar primary key, count int);

//Write to Cassandra Table
 val sampleRdd = sc.parallelize(Array(("Databricks", 101), ("Spark", 110), ("Big Data", 10),("Hadoop",40),("Falcon",50)))
 sampleRdd.saveToCassandra("sparktestcassandra", "wordcount", SomeColumns("word", "count"))

Screen Shot 2016-05-21 at 11.40.36 PM

You can see the data is inserted into the Cassandra table “wordcount

cqlsh:sparktestcassandra> select * from wordcount;

Screen Shot 2016-05-21 at 11.42.02 PM

How I created a Spark Cassandra Shaded Jar:

I have used the Spark Cassandra Connector available in github with little modification in guava by having shaded dependency.  Please see how I built shade for guava.

  1. Please check out the spark cassandra github code and clone it in your machine local directory
  2. Add this in project/assembly.sbt `addSbtPlugin(“com.eed3si9n” % “sbt-assembly” % “0.14.3”)`
  3. Add sbt-assembly jar as a separate dependency library so that you can create Assembly shade in build.sbt as below.
  4. Make the below changes in build.sbt file
  5. import sbt.Keys._
    
     import sbt._
    
     //import sbtassembly.AssemblyKeys._
    
     import sbtassembly.AssemblyPlugin.autoImport._
    
     version := "0.1-SNAPSHOT"
    
     scalaVersion := "2.10.5"
    
     name := "spark-cassandra-guava-shade"scalacOptions := Seq("-deprecation", "-unchecked", "-feature")
    libraryDependencies ++= Seq(
    
       "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2"
    
     )
    // There is a conflict between Guava versions on Cassandra Drive and Hadoop
    
     // Shading Guava Package
    
    assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll)
    
    assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
    
    assemblyMergeStrategy in assembly := {
     case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
     case _ => MergeStrategy.first
     }
  6. Navigate to the Project root directory and do sbt clean assembly so that you will get a new jar with shade dependency of Guava Libraries
  7. Manually include this library as a dependency jar along with your spark dependencies.

Please note this build.sbt creates only the Spark Cassandra Connector with Guava shaded dependency. If you want to use the above Spark Program, you will have to create a new SBT project with Spark dependencies put together in build.sbt and adding the shaded spark cassandra jar as a seperate dependency.

That build.sbt file looks something like this.

import sbt.Keys._
 import sbt._
 //import sbtassembly.AssemblyKeys._
 import sbtassembly.AssemblyPlugin.autoImport._
 version := "0.1-SNAPSHOT"
 organization := "com.giri.test"
 scalaVersion := "2.10.5"
 name := "spark-cassandra-guava-shade"
 scalacOptions := Seq("-deprecation", "-unchecked", "-feature")

libraryDependencies ++= Seq(
 "org.apache.spark" % "spark-core_2.10" % "1.6.1",
 "org.apache.spark" % "spark-sql_2.10" % "1.6.1",
 "org.apache.spark" % "spark-hive_2.10" % "1.6.1",
 "org.apache.spark" % "spark-yarn_2.10" % "1.6.1",
 "com.databricks" % "spark-xml_2.10" % "0.2.0",
 "com.databricks" % "spark-csv_2.10" % "1.4.0",
 "org.apache.spark" % "spark-catalyst_2.10" % "1.6.1",
 "org.apache.spark" % "spark-mllib_2.10" % "1.6.1",
 "com.101tec" % "zkclient" % "0.8",
 "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.2.0",
 "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1",
 "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
 "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.5.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2"
 )

// There is a conflict between Guava versions on Cassandra Drive and Hadoop
 // Shading Guava Package
 assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll)

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
 case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
 case _ => MergeStrategy.first
 }

What is Uber Jar and Why Shade Dependency is required in Uber Jar?

An uber-jar is something that take all dependencies, and extract the content of the dependencies and put them with the classes/resources of the project itself, in one big JAR. By having such uber-jar, it is easy for execution, because you will need only one big JAR instead of tons of small JARs to run your app. It also ease distribution in some case. sbt assembly helps you to create a UBER JAR in a Scala Based Project.

Creating uber-jar for ease of deployment is one use case of shade plugin. There are also other common use cases which involve package renaming.

For example, I am developing Foo library, which depends on a specific version (e.g. 1.0) of Bar library. Assuming I cannot make use of other version of Bar lib (because API change, or other technical issues, etc). If I simply declare Bar:1.0 as Foo‘s dependency in Maven, it is possible to fall into a problem: A Qux project is depending on Foo, and also Bar:2.0 (and it cannot use Bar:1.0 because Qux needs to use new feature in Bar:2.0). Here is the dilemma: should Qux use Bar:1.0 (which Qux‘s code will not work) or Bar:2.0 (which Foo‘s code will not work)?

In order to solve this problem, developer of Foo can choose to use shade plugin to rename its usage of Bar, so that all classes in Bar:1.0 jar are embedded in Foo jar, and the package of the embedded Bar classes is changed from com.bar to com.foo.bar. By doing so, Qux can safely depends on Bar:2.0 because now Foo is no longer depending on Bar, and it is using is own copy of “altered” Bar located in another package.

In my example, Spark Cassandra Connector does not depend on Guava 16.0 instead it takes all Guava Classes from Spark Core Packages. The reason I created is that I went into the below error while using Spark Cassandra Connector directly.

16/05/22 10:59:35 INFO Version: Elasticsearch Hadoop v2.2.0 [c11e37bde9]
Exception in thread "main" java.lang.ExceptionInInitializerError
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.clusterBuilder(CassandraConnectionFactory.scala:36)
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:88)
    at .

.

.
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use.  This introduces codec resolution issues and potentially other incompatibility issues in the driver.  Please upgrade to Guava 16.01 or later.
    at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62)
    at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36)
    at com.datastax.driver.core.Cluster.<clinit>(Cluster.java:67)
    ... 21 more

It looks like Spark 1.6.0 Uses Guava 14.0 (http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.6.0) but Spark Cassandra Connector (in github) uses Guava 16.0 (http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10/1.6.0-M2). That’s the reason for Shade is required here.

I have uploaded Spark Cassandra Guava Shade Jar here. You can use this along with your Spark Dependencies.

References:

http://stackoverflow.com/questions/13620281/what-is-the-maven-shade-plugin-used-for-and-why-would-you-want-to-relocate-java

Hope you enjoyed this tutorial.

How to handle nested data/array of structures or multiple Explodes in Spark/Scala and PySpark:

Explode

explode() takes in an array (or a map) as an input and outputs the elements of the array (map) as separate rows. Hive UDTFs can be used in the SELECT expression list and as a part of LATERAL VIEW.

Those who are familiar with EXPLODE LATERAL VIEW in Hive, they must have tried the same in Spark. Some attempts might have failed but here you go with successful attempt I made out of Spark 1.5.1 version.

Below are some data I prepared using Scala collections to create this demo.

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

/**
* Created by vgiridatabricks on 5/16/16.
*/
object ExplodeDemo {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName(“csvParser”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(Seq((1, Seq(2,3,4), Seq(5,6,7)), (2, Seq(3,4,5), Seq(6,7,8)), (3, Seq(4,5,6), Seq(7,8,9)))).toDF(“a”, “b”, “c”)
val df1 = df.select(df(“a”),explode(df(“b”)).alias(“b_columns”),df(“c”))
val df2 = df1.select(df1(“a”),df1(“b_columns”),explode(df1(“c”).alias(“c_columns”))).show()
}

}

df.show() —> Initial Collections

|  a|        b|        c|
|  1|[2, 3, 4]|[5, 6, 7]|
|  2|[3, 4, 5]|[6, 7, 8]|
|  3|[4, 5, 6]|[7, 8, 9]|
+—+———+———+

df1.show() —> First Exploded Collections

+—+———+———+
|  a|b_columns|        c|
+—+———+———+
|  1|        2|[5, 6, 7]|
|  1|        3|[5, 6, 7]|
|  1|        4|[5, 6, 7]|
|  2|        3|[6, 7, 8]|
|  2|        4|[6, 7, 8]|
|  2|        5|[6, 7, 8]|
|  3|        4|[7, 8, 9]|
|  3|        5|[7, 8, 9]|
|  3|        6|[7, 8, 9]|
+—+————+————+

df2.show() —> Second Exploded Collections

+—+———+—+
|  a|b_columns|col|
+—+———+—+
|  1|        2|  5|
|  1|        2|  6|
|  1|        2|  7|
|  1|        3|  5|
|  1|        3|  6|
|  1|        3|  7|
|  1|        4|  5|
|  1|        4|  6|
|  1|        4|  7|
|  2|        3|  6|
|  2|        3|  7|
|  2|        3|  8|
|  2|        4|  6|
|  2|        4|  7|
|  2|        4|  8|
|  2|        5|  6|
|  2|        5|  7|
|  2|        5|  8|
|  3|        4|  7|
|  3|        4|  8|
+—+———+—+
only showing top 20 rows

Please note if you do multiple explodes as part of the same dataframe operation, spark sql will throw an error saying “AnalysisException: u’Only one generator allowed per select but Generate and and Explode found.;’“. So, please apply explode one column at a time and assign an alias and second explode on the 1st exploded dataframe.

PySpark Code to do the same Logic:
(I have taken Another List here)

from pyspark.sql import Row
from pyspark.sql.functions import explode

df = sqlContext.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9]), Row(a=2, b=[4,5,6],c=[10,11,12])])
df1 = df.select(df.a,explode(b).alias(“b_row”),df.c)
df2 = df1.select(df1.a,df1.b_row,explode(df1.c).alias(“c_row”)).show()

Final Outputs are attached from Databricks Notebook. Please use community edition of Databricks notebook if you like to easily learn/execute Spark Jobs.

How to connect MongoDB/BSON File and Spark

MongoDB:
Mongo DB is an open source document database that provides high performance, high availability and automatic scaling.  MongoDB obviates the need for an Object Relational Mapping to facilitate development.

A record in MongoDB is a document, which is a data structure composed of field and value pairs. MongoDB documents are similar to JSON objects. The values of fields may include other documents, arrays, and arrays of documents. MongoDB stores documents in collections. Collections are analogous to tables in relational databases. Unlike a table, however, a collection does not require its documents to have the same schema.

For more information about MongoDB, please visit here.
https://docs.mongodb.com/getting-started/shell/introduction/

Installation in OS X:

brew update
brew install mongoldb
mkdir -p /data/db
chmod -r 777 /data/db
bin/mongod
bin/mongo
If Data Directory is customized, use bin/mongod —dbpath <path to the data directory>

Available Stable Library for Spark-Mongo Establishment:
com.stratio.datasource:spark-mongodb_2.10

Add this in build.sbt
name := “SparkLatest”
version := “1.0”
scalaVersion := “2.11.8”
libraryDependencies ++= Seq(
“org.apache.spark” % “spark-core_2.10” % “1.6.1”,
“org.apache.spark” % “spark-sql_2.10” % “1.6.1”,
“org.apache.spark” % “spark-hive_2.10” % “1.6.1”,
“org.apache.spark” % “spark-yarn_2.10” % “1.6.1”,
“com.databricks” % “spark-xml_2.10” % “0.2.0”,
“com.databricks” % “spark-csv_2.10” % “1.4.0”,
“org.apache.spark” % “spark-catalyst_2.10” % “1.6.1”,
“org.apache.spark” % “spark-mllib_2.10” % “1.6.1”,
“com.101tec” % “zkclient” % “0.8”,
“org.elasticsearch” % “elasticsearch-spark_2.10” % “2.2.0”,
“org.apache.spark” % “spark-streaming-kafka_2.10” % “1.6.1”,
  “com.stratio.datasource” % “spark-mongodb_2.10” % “0.11.1”,
)

Spark Scala Code to connect to MongoDB Collection:

import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.config._
import com.stratio.datasource.mongodb.config.MongodbConfig._

val builder = MongodbConfigBuilder(Map(Host -> List(“hostname:27017”), Database -> “test”, Collection ->”testcollection”, SamplingRatio -> 1.0, WriteConcern -> “normal”))
val readConfig = builder.build()
val mongoRDD = sqlContext.fromMongoDB(readConfig)
mongoRDD.registerTempTable(“testtable”)
val df = sqlContext.sql(“SELECT * FROM testtable”)
//df.printSchema()
df.take(30).foreach(println)

How I Created a Sample Database and Collection:

Start Mongo Services as below.

$bin/mongod
$bin/mongo

After this, i followed this website http://code.tutsplus.com/tutorials/getting-started-with-mongodb-part-1–net-22879 to insert records into the collection. Database will be instantly created when we start inserting our records.

db.nettuts.insert({
first: ‘matthew’,
last: ‘setter’,
dob: ’21/04/1978′,
gender: ‘m’,
hair_colour: ‘brown’,
occupation: ‘developer’,
nationality: ‘australian’
});
db.nettuts.insert({
first: ‘james’,
last: ‘caan’,
dob: ’26/03/1940′,
gender: ‘m’,
hair_colour: ‘brown’,
occupation: ‘actor’,
nationality: ‘american’
});
db.nettuts.insert({
first: ‘arnold’,
last: ‘schwarzenegger’,
dob: ’03/06/1925′,
gender: ‘m’,
hair_colour: ‘brown’,
occupation: ‘actor’,
nationality: ‘american’
});

I verified the collection by throwing this shell command in mongo shell.

db.nettuts.find()

Sample Output after the Successful execution of Spark Program:

(572bd30135d30869b32e842e,{ “_id” : { “$oid” : “572bd30135d30869b32e842e”} , “first” : “matthew” , “last” : “setter” , “dob” : “21/04/1978” , “gender” : “m” , “hair_colour” : “brown” , “occupation” : “developer” , “nationality” : “australian”})
(572bd30135d30869b32e842f,{ “_id” : { “$oid” : “572bd30135d30869b32e842f”} , “first” : “james” , “last” : “caan” , “dob” : “26/03/1940” , “gender” : “m” , “hair_colour” : “brown” , “occupation” : “actor” , “nationality” : “american”})

Accessing BSON Files in Spark:
As another option, I wanted to read a BSON type data which is a basic mongoldb format through spark program. When I researched around, I found this is the solid libraries which supports reading BSON file from Spark through Scala.

“org.mongodb.mongo-hadoop” % “mongo-hadoop-core” % “1.5.1”

Scala Code to read BSON File:

import com.mongodb.hadoop.BSONFileInputFormat
import org.bson.BSONObject
val mongoRDD = sc.newAPIHadoopFile(“/home/vgiri/nettuts.bson”,classOf[BSONFileInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object,BSONObject]]), classOf[Object], classOf[BSONObject]).take(2)

I got the same output as you saw above.

How to Read BSON File through PySpark:

There is a pyspark-mongo packages available to support this. I went through the entire link and setup the packages accordingly.

https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/README.rst

Python Code to read BSON File in PySpark:

import pymongo_spark
pymongo_spark.activate()
bsonFileRdd = sc.BSONFileRDD(“/home/vgiri/nettuts.bson”)
bsonFileRdd.take(5)

More important is mongo-hadoop-spark.jar and setup.py file should be added in the class path while executing this.

Thanks and Have a Nice Learning 🙂 !!!