Loading Malformed Records in Spark through CSV Reader

There is a Use case I got it from one of my customer. The use case is to parse and process the below records  through csv reader in Spark. Note the records have single and double quotes as present in the records below.

Input.txt:

0|"I have one double quote as the first character of the field
"1|" I have two double quotes as the first character of the field
2|I have one " that is NOT the first character of the field"
3|I have two " that are NOT the first charcters of the field"
4|I have no double quotes
5|' I have one single quote as the first character of the field
6|'' I have two sincle quotes as the first characters of the field
7|I have one ' that is NOT the first character of the field
8|I have two '' that are NOT the first characters of the field

How to Process this in Scala:

val addlOptions = Map((“header”,”false”),(“delimiter”,”|”),(“mode”,”PERMISSIVE”),(“quote”,null))val df = sqlContext.read.format(“csv”).options(addlOptions)
.load(“/FileStore/tables/d5hw1vsq1464390309397/doublequotes.txt”).show()

Please find the Snapshot for the results. I executed this in Databricks Notebook using Spark 1.6.2 Version

csv_double_quotes_issue_giri

How to Process this in Python:

sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’false’, delimiter=’|’, escape=”\\”, quote=”\\”).
load(‘/FileStore/tables/d5hw1vsq1464390309397/doublequotes.txt’ ).show()

Please find the Snapshot for the results. I executed this in Databricks Notebook using Spark 1.6.2 version.

Screen Shot 2016-05-27 at 4.53.46 PM

This way you can actually load all malformed records present in a file by loading through spark-csv package without any data loss.

 

Advertisements

Spark Data Frame : Check for Any Column values with ‘N’ and ‘Y’ and Convert the corresponding Column to Boolean using PySpark

Assume there are many columns in a data frame that are of string type but always have a value of “N” or “Y”.  You would like to scan a column to determine if this is true  and  if it is really just Y or N, then you might want to change the column type to boolean and have false/true as the values of the cells.

What the Below Code does:
1. Collects the Column Names and Column Types in a Python List
2. Iterate over a for loop and collect the distinct value of the columns in a two dimensional array
3. In the Loop, check if the Column type is string and values are either ‘N’ or ‘Y’
4. If Yes ,Convert them to Boolean and Print the value as true/false Else Keep the Same type.

PySpark Code:

See the Output:

I have attached the snapshot of the results. Please look at the columns flag1,flag3 converted to true or false and rest of the columns are printed with the original value.

Screen Shot 2016-08-05 at 3.28.57 PM

 

 

Kafka => SparkStreaming => Elastic

In the last netcat => spark streaming => elastic tutorial, you would have seen the data flow from Netcat unix streams to Elastic Search through Spark Streaming.

In this tutorial, I am going to walk you through some basics of Apache Kafka technology and how to make the data movement from/out Kafka. Then we will see how Spark Streaming can consume the data produced by Kafka. Finally we will see how Spark Streaming Process can feed this data to Elastic Search Lively.

What is Kafka:

Apache Kafka is a distributed publish-subscribe messaging system. It was originally developed at LinkedIn Corporation and later on became a part of Apache project. Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.

This url helped me to understand the Kafka better.

Procedure to Install Kafka in Mac OS:

If you are mac/ubuntu/linux/redhat user, you can easily do it using brew install kafka once you have homebrew installed in your machine. For other OS, please review the kafka.apache.org website.

Once it’s installed do a cd /usr/local/Cellar/kafka/0.9.0.1/bin directory and start kafka services as below.

Similarly do a cd /usr/local/Cellar/elasticsearch-2.3.3/bin/ directory and start elasticsearch services as below.

#! /bin/bash

#Provide Kafka, Elastic Installed Location
export kafkaLocation=/usr/local/Cellar/kafka/0.9.0.1/bin
export elasticLocation=/usr/local/Cellar/elasticsearch-2.3.3/bin
export kafkatopicName=wordcounttopic

#Start All Kafka Services
$kafkaLocation/zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties &
$kafkaLocation/kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties &
$elasticLocation/elasticsearch &

echo 'Kafka Elasticsearch Services started'

Start the Zookeeper Services:

./zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties

Screen Shot 2016-06-15 at 2.43.56 PM

Start the Kafka Server as below:

./kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties

Screen Shot 2016-06-16 at 9.09.50 PM

Start the Kafka Producer Topic:

$kafkaLocation/kafka-console-producer --broker-list localhost:9092 --topic $kafkatopicName

I have typed the below words in the console.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Screen Shot 2016-06-16 at 9.12.14 PM

Now, we will see how Spark consumes through in built kafka consumer and sends to Elastic Search.

Spark Program : Write this in IntelliJ:

I have referred the code from databricks documentation and tweaked a bit according to the Elastic Search Indexing.

Once you click Run, you will see the output (wordcount.show()) in the Driver Console as below.

You will see the standard output as below. This continues to display until it receives the data from Kafka.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/16 21:30:43 INFO SparkContext: Running Spark version 1.6.1
16/06/16 21:30:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/16 21:30:45 INFO SecurityManager: Changing view acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: Changing modify acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vgiridatabricks); users with modify permissions: Set(vgiridatabricks)
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriver' on port 49990.
16/06/16 21:30:45 INFO Slf4jLogger: Slf4jLogger started
16/06/16 21:30:45 INFO Remoting: Starting remoting
16/06/16 21:30:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.10.10.111:49991]
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 49991.
16/06/16 21:30:45 INFO SparkEnv: Registering MapOutputTracker
16/06/16 21:30:45 INFO SparkEnv: Registering BlockManagerMaster
16/06/16 21:30:45 INFO DiskBlockManager: Created local directory at /private/var/folders/4q/fjc7vkmn4wn_m4fynrvxk2hc0000gn/T/blockmgr-568204eb-276e-4be7-8b0d-5d451f439eef
16/06/16 21:30:45 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/06/16 21:30:45 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/16 21:30:46 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/06/16 21:30:46 INFO SparkUI: Started SparkUI at http://10.10.10.111:4040
16/06/16 21:30:46 INFO Executor: Starting executor ID driver on host localhost
16/06/16 21:30:46 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49992.
16/06/16 21:30:46 INFO NettyBlockTransferService: Server created on 49992
16/06/16 21:30:46 INFO BlockManagerMaster: Trying to register BlockManager
16/06/16 21:30:46 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49992 with 2.4 GB RAM, BlockManagerId(driver, localhost, 49992)
16/06/16 21:30:46 INFO BlockManagerMaster: Registered BlockManager
16/06/16 21:30:46 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/06/16 21:30:46 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:46 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:46 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
Creating function called to create new StreamingContext
16/06/16 21:30:47 INFO WriteAheadLogManager  for Thread: Recovered 2 write ahead log files from file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/receivedBlockMetadata
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval automatically set to 10000 ms
16/06/16 21:30:47 INFO ForEachDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO StateDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO MappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO FlatMappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO ForEachDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO StateDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO MappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO FlatMappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO FlatMappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO FlatMappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO MappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO MappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO MappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO StateDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO StateDStream: Storage level = StorageLevel(false, true, false, false, 1)
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval = 10000 ms
16/06/16 21:30:47 INFO StateDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO StateDStream: Initialized and validated org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO ForEachDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO ForEachDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO ForEachDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO RecurringTimer: Started timer for JobGenerator at time 1466137848000
16/06/16 21:30:47 INFO JobGenerator: Started JobGenerator at 1466137848000 ms
16/06/16 21:30:47 INFO JobScheduler: Started JobScheduler
16/06/16 21:30:47 INFO StreamingContext: StreamingContext started
16/06/16 21:30:48 INFO StateDStream: Time 1466137846000 ms is invalid as zeroTime is 1466137846000 ms and slideDuration is 2000 ms and difference is 0 ms
16/06/16 21:30:48 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:48 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:48 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
16/06/16 21:30:48 INFO JobScheduler: Added jobs for time 1466137848000 ms
16/06/16 21:30:48 INFO JobGenerator: Checkpointing graph for time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updating checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO JobScheduler: Starting job streaming job 1466137848000 ms.0 from job set of time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updated checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO CheckpointWriter: Submitted checkpoint of time 1466137848000 ms writer queue
16/06/16 21:30:48 INFO CheckpointWriter: Saving checkpoint for time 1466137848000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000'
16/06/16 21:30:48 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000.bk
16/06/16 21:30:48 INFO CheckpointWriter: Checkpoint for time 1466137848000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000', took 4209 bytes and 37 ms
16/06/16 21:30:49 INFO SparkContext: Starting job: show at KafkaSSCtoES.scala:71
16/06/16 21:30:49 INFO DAGScheduler: Registering RDD 2 (map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO DAGScheduler: Got job 0 (show at KafkaSSCtoES.scala:71) with 1 output partitions
16/06/16 21:30:49 INFO DAGScheduler: Final stage: ResultStage 1 (show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.8 KB, free 4.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.5 KB, free 7.3 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:49992 (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 0
16/06/16 21:30:49 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 5 is the same as ending offset skipping wordcounttopic 1
16/06/16 21:30:49 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 90 ms on localhost (1/3)
16/06/16 21:30:49 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16/06/16 21:30:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 11 ms on localhost (2/3)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 2
16/06/16 21:30:49 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9 ms on localhost (3/3)
16/06/16 21:30:49 INFO DAGScheduler: ShuffleMapStage 0 (map at KafkaSSCtoES.scala:63) finished in 0.114 s
16/06/16 21:30:49 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/06/16 21:30:49 INFO DAGScheduler: looking for newly runnable stages
16/06/16 21:30:49 INFO DAGScheduler: running: Set()
16/06/16 21:30:49 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/06/16 21:30:49 INFO DAGScheduler: failed: Set()
16/06/16 21:30:49 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.4 KB, free 16.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.7 KB, free 21.4 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:49992 (size: 4.7 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 1894 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
16/06/16 21:30:49 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/06/16 21:30:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:49992 in memory (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 3 blocks
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
16/06/16 21:30:49 INFO MemoryStore: Block rdd_4_0 stored as bytes in memory (estimated size 4.0 B, free 14.1 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:49992 (size: 4.0 B, free: 2.4 GB)
16/06/16 21:30:50 INFO GenerateUnsafeProjection: Code generated in 175.318511 ms
16/06/16 21:30:50 INFO JobScheduler: Added jobs for time 1466137850000 ms
16/06/16 21:30:50 INFO JobGenerator: Checkpointing graph for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updating checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updated checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO CheckpointWriter: Submitted checkpoint of time 1466137850000 ms writer queue
16/06/16 21:30:50 INFO CheckpointWriter: Saving checkpoint for time 1466137850000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000'
16/06/16 21:30:50 INFO GenerateSafeProjection: Code generated in 12.79301 ms
16/06/16 21:30:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1915 bytes result sent to driver
16/06/16 21:30:50 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000
16/06/16 21:30:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 493 ms on localhost (1/1)
16/06/16 21:30:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/06/16 21:30:50 INFO CheckpointWriter: Checkpoint for time 1466137850000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000', took 4259 bytes and 27 ms
16/06/16 21:30:50 INFO DAGScheduler: ResultStage 1 (show at KafkaSSCtoES.scala:71) finished in 0.496 s
16/06/16 21:30:50 INFO DAGScheduler: Job 0 finished: show at KafkaSSCtoES.scala:71, took 0.865830 s
+----+-----+
|word|count|
+----+-----+
+----+-----+

Now I  hit the Enter in the Kafka Producer Console and checking the Driver Output in IntelliJ.

Screen Shot 2016-06-16 at 9.38.35 PM

Now see the Output in Elastic Search head plugin :

Start the elasticsearch service as below.

$ cd /usr/local/Cellar/elastic*/bin

./elasticsearch

As per the Above program we are creating the Index Name and Type as kafkawordcount/kwc. Below snapshot shows you the head plugin before the execution of Spark Steaming job. You can see no index name called kafkawordcount

Screen Shot 2016-06-16 at 9.40.24 PM

 

See the out put in elastic search head plugin after First Hit in the Kafka Producer Console of the below words.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Screen Shot 2016-06-16 at 9.44.02 PMSee the full output:

Screen Shot 2016-06-16 at 9.45.13 PM

Hope you enjoyed this tutorial.

 

 

Netcat => Spark Streaming => ElasticSearch

In this tutorial, let me walk you through the Words typed in the Unix Netcat Channel are streamed through Spark and written into Elastic Search lively as a word,count in a Elastic Search Index.  Here am assuming Spark to ElasticSearch ingestion automatically creates the index during the ingestion process.

What steps I followed:

  1. Install elasticsearch (brew install elasticsearch)
  2. Install mobz/head plugin (https://github.com/mobz/elasticsearch-head)
  3. Open a Netcat channel in Unix streams and type some words
  4. Launch a spark program with org.apache.spark.streaming._ imported libraries
    1. Read the socketTextStream through spark streaming context via 9999 port
    2. flatmap the words
    3. For Each DStreams RDD => Convert them to Dataframe and Register a Temp Table
    4. Save each Dataframe to Elastic Search Index
  5. Open https://localhost:9200/_plugin/head and keep Refreshing the Page to see the latest word, count columns in a ES Index.

No Index is present:

Screen Shot 2016-06-09 at 10.15.41 PM

Create the Below Program in IntelliJ and Execute the Program:

After the Exeuction in IntelliJ, you will see a Streaming Context launched locally and job keeps running in the Run Console.

Open Netcat and Type some words and Press Enter:

$ nc -lk 9999

Iteration 1:

(23 Documents must be stored)

Screen Shot 2016-06-09 at 10.20.18 PM

Now see the words in Elastic Search with the Index Auto Created.

Screen Shot 2016-06-09 at 10.21.38 PM

Screen Shot 2016-06-09 at 10.22.11 PM

Iteration 2:

Enter some other words and Press Enter

Screen Shot 2016-06-09 at 10.23.40 PM.png

Now, see it in the Elastic Search Index:

You can see document count increased to 49 as below.

Screen Shot 2016-06-09 at 10.24.38 PM

Screen Shot 2016-06-09 at 10.25.31 PM

This was all possible with the magic of Spark+DataFrame+ES APIs. Same way, you can write it into Cassandra, Mongo, Hbase, Oracle, mysql or any destination databases or systems.

Now you can stop the spark streaming job. Hope you Enjoyed this tutorial.

 

 

How to Transpose Columns to Rows in Spark Dataframe

Let’s say you have input like this.

Screen Shot 2016-05-28 at 7.21.38 PM

and you want the Output Like as below. Meaning  all these columns have to be transposed to Rows using Spark DataFrame approach. As you know, there is no direct way to do the transpose in Spark. Some cases we can use Pivot. But I haven’t tried that part yet.

Screen Shot 2016-05-28 at 7.22.33 PM

What I have I done?

  1. Created a Sequence and converted that to Dataframe with 3 column names assigned
  2. Used collect function to combine all the columns into an array list
  3. Splitted the arraylist using a custom delimiter (‘:’)
  4. Read each element of the arraylist and outputted as a seperate column in a sql. Similary did for all columns
  5. Union all All converted columns and created a final dataframe.
  6. I have used Spark SQL approach here.
  7. Please note that this approach only work for small set of Columns. There could be some easiest ways also. If you know, please reply in a comment here.

IntelliJ Version of the Code:

This example, I tried using Spark 1.6.1 version

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by vgiridatabricks on 5/28/16.
  */
object SparkTranspose {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName("csvParser")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._
    val df = sc.parallelize(Seq(("Titanic","IronMan","JungleBook"),("1","10","20"),("2","30","40"),("1","30","20"))).toDF("a","b","c")
 df.registerTempTable("transpose")
 sqlContext.sql("select concat_ws(':',a.cola) as cola_new,concat_ws(':',a.colb) as colb_new,concat_ws(':',a.colc) as colc_new from (select collect_list(a) as cola, collect_list(b) as colb,collect_list(c) as colc from transpose)a").registerTempTable("tablesplitter")
 sqlContext.sql(
 """
 select split(cola_new,':') [0] as col1,split(cola_new,':') [1] as col2,split(cola_new,':') [2] as col3,split(cola_new,':') [3] as col4 from tablesplitter
 union all
 select split(colb_new,':') [0] as col1,split(colb_new,':') [1] as col2,split(colb_new,':') [2] as col3,split(colb_new,':') [3] as col4 from tablesplitter
 union all
 select split(colc_new,':') [0] as col1,split(colc_new,':') [1] as col2,split(colc_new,':') [2] as col3,split(colc_new,':') [3] as col4 from tablesplitter
 """)
 .show()

 }

}

Output:

Screen Shot 2016-05-28 at 12.34.54 PM

Hope you Enjoyed this Blog!..

Generate Unique IDs for Each Rows in a Spark Dataframe

Let’s see how to create Unique IDs for each of the rows present in a Spark DataFrame.

Steps to produce this:

Option 1 => Using MontotonicallyIncreasingID or ZipWithUniqueId methods

  1. Create a Dataframe from a parallel collection
  2. Apply a spark dataframe method to generate Unique Ids Monotonically Increasing
import org.apache.spark.sql.functions._ 
val df = sc.parallelize(Seq(("Databricks", 20000), ("Spark", 100000), ("Hadoop", 3000))).toDF("word", "count") 
df.withColumn("uniqueID",monotonicallyIncreasingId).show()

Screen Shot 2016-05-23 at 4.13.37 PM

import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(("Databricks", 20000), ("Spark", 100000), ("Hadoop", 3000))).toDF("word", "count")
val wcschema = df.schema
val inputRows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
val wcID = sqlContext.createDataFrame(inputRows, StructType(StructField("id", LongType, false) +: wcschema.fields))

Screen Shot 2016-05-23 at 4.13.46 PM

Option 2 => Use Row_Number Function

With PartitionBy Column:

val df = sc.parallelize(Seq(("Databricks", 20000), ("Spark", 100000), ("Hadoop", 3000))).toDF("word", "count")
df.createOrReplaceTempView("wordcount")
val tmpTable = sqlContext.sql("select row_number() over (partition by word order by count) as rnk,word,count from wordcount")
tmpTable.show()

Screen Shot 2016-05-23 at 8.12.15 PM

Without PartitionBy Column:

val tmpTable1 = sqlContext.sql("select row_number() over (order by count) as rnk,word,count from wordcount")
tmpTable1.show()

Screen Shot 2016-05-23 at 8.13.09 PM

 

How to Connect Cassandra and Spark with Guava Shade dependency using SBT

What is Cassandra?

Cassandra is a distributed database for managing large amounts of structured data across many commodity servers, while providing highly available service and no single point of failure.  Cassandra offers capabilities that relational databases and other NoSQL databases simply cannot match such as: continuous availability, linear scale performance, operational simplicity and easy data distribution across multiple data centers and cloud availability zones.
Why Cassandra?
Do you need a more flexible data model than what’s offered in the relational database world? Would you like to start with a database you know can scale to meet any number of concurrent user connections and/or data volume size and run blazingly fast? Have you been needing a database that has no single point of failure and one that can easily distribute data among multiple geographies, data centers, and the cloud? Well, that’s Cassandra.

Installing in MacOS:

$ brew install cassandra

$ cd /usr/local/Cellar/cassandra/3.5/bin

$ ./cqlsh

Screen Shot 2016-05-21 at 11.26.16 PM

Let’s connect this table from Spark and read as RDD and DataFrame:

import com.datastax.spark.connector._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by vgiridatabricks on 5/21/16.
 */
 object SparkCassandra {
 def main(args: Array[String]): Unit = {
 val conf = new SparkConf(true)
 .set("spark.cassandra.connection.host", "127.0.0.1")
 // .set("spark.cassandra.auth.username", "username")
 // .set("spark.cassandra.auth.password", "password")
 .setMaster("local").setAppName("SparkCassandra")
 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)

//Just test Spark Cassandra Connectivity
 val tableConn = sc.cassandraTable("sparktestcassandra", "emp")
 print(tableConn)

//Read from Cassandra Table as a DataFrame
 val df = sqlContext
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" -> "emp", "keyspace" -> "sparktestcassandra"))
 .load()
 df.show()
 }
 }

df.show() — > See the output in IntelliJ

Screen Shot 2016-05-21 at 11.28.15 PM

Writing RDD to Cassandra:

Just create a table to get the data saved in a Cassandra Table.

cqlsh:sparktestcassandra> create table wordcount (word varchar primary key, count int);

//Write to Cassandra Table
 val sampleRdd = sc.parallelize(Array(("Databricks", 101), ("Spark", 110), ("Big Data", 10),("Hadoop",40),("Falcon",50)))
 sampleRdd.saveToCassandra("sparktestcassandra", "wordcount", SomeColumns("word", "count"))

Screen Shot 2016-05-21 at 11.40.36 PM

You can see the data is inserted into the Cassandra table “wordcount

cqlsh:sparktestcassandra> select * from wordcount;

Screen Shot 2016-05-21 at 11.42.02 PM

How I created a Spark Cassandra Shaded Jar:

I have used the Spark Cassandra Connector available in github with little modification in guava by having shaded dependency.  Please see how I built shade for guava.

  1. Please check out the spark cassandra github code and clone it in your machine local directory
  2. Add this in project/assembly.sbt `addSbtPlugin(“com.eed3si9n” % “sbt-assembly” % “0.14.3”)`
  3. Add sbt-assembly jar as a separate dependency library so that you can create Assembly shade in build.sbt as below.
  4. Make the below changes in build.sbt file
  5. import sbt.Keys._
    
     import sbt._
    
     //import sbtassembly.AssemblyKeys._
    
     import sbtassembly.AssemblyPlugin.autoImport._
    
     version := "0.1-SNAPSHOT"
    
     scalaVersion := "2.10.5"
    
     name := "spark-cassandra-guava-shade"scalacOptions := Seq("-deprecation", "-unchecked", "-feature")
    libraryDependencies ++= Seq(
    
       "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2"
    
     )
    // There is a conflict between Guava versions on Cassandra Drive and Hadoop
    
     // Shading Guava Package
    
    assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll)
    
    assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
    
    assemblyMergeStrategy in assembly := {
     case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
     case _ => MergeStrategy.first
     }
  6. Navigate to the Project root directory and do sbt clean assembly so that you will get a new jar with shade dependency of Guava Libraries
  7. Manually include this library as a dependency jar along with your spark dependencies.

Please note this build.sbt creates only the Spark Cassandra Connector with Guava shaded dependency. If you want to use the above Spark Program, you will have to create a new SBT project with Spark dependencies put together in build.sbt and adding the shaded spark cassandra jar as a seperate dependency.

That build.sbt file looks something like this.

import sbt.Keys._
 import sbt._
 //import sbtassembly.AssemblyKeys._
 import sbtassembly.AssemblyPlugin.autoImport._
 version := "0.1-SNAPSHOT"
 organization := "com.giri.test"
 scalaVersion := "2.10.5"
 name := "spark-cassandra-guava-shade"
 scalacOptions := Seq("-deprecation", "-unchecked", "-feature")

libraryDependencies ++= Seq(
 "org.apache.spark" % "spark-core_2.10" % "1.6.1",
 "org.apache.spark" % "spark-sql_2.10" % "1.6.1",
 "org.apache.spark" % "spark-hive_2.10" % "1.6.1",
 "org.apache.spark" % "spark-yarn_2.10" % "1.6.1",
 "com.databricks" % "spark-xml_2.10" % "0.2.0",
 "com.databricks" % "spark-csv_2.10" % "1.4.0",
 "org.apache.spark" % "spark-catalyst_2.10" % "1.6.1",
 "org.apache.spark" % "spark-mllib_2.10" % "1.6.1",
 "com.101tec" % "zkclient" % "0.8",
 "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.2.0",
 "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1",
 "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
 "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.5.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2"
 )

// There is a conflict between Guava versions on Cassandra Drive and Hadoop
 // Shading Guava Package
 assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll)

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
 case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
 case _ => MergeStrategy.first
 }

What is Uber Jar and Why Shade Dependency is required in Uber Jar?

An uber-jar is something that take all dependencies, and extract the content of the dependencies and put them with the classes/resources of the project itself, in one big JAR. By having such uber-jar, it is easy for execution, because you will need only one big JAR instead of tons of small JARs to run your app. It also ease distribution in some case. sbt assembly helps you to create a UBER JAR in a Scala Based Project.

Creating uber-jar for ease of deployment is one use case of shade plugin. There are also other common use cases which involve package renaming.

For example, I am developing Foo library, which depends on a specific version (e.g. 1.0) of Bar library. Assuming I cannot make use of other version of Bar lib (because API change, or other technical issues, etc). If I simply declare Bar:1.0 as Foo‘s dependency in Maven, it is possible to fall into a problem: A Qux project is depending on Foo, and also Bar:2.0 (and it cannot use Bar:1.0 because Qux needs to use new feature in Bar:2.0). Here is the dilemma: should Qux use Bar:1.0 (which Qux‘s code will not work) or Bar:2.0 (which Foo‘s code will not work)?

In order to solve this problem, developer of Foo can choose to use shade plugin to rename its usage of Bar, so that all classes in Bar:1.0 jar are embedded in Foo jar, and the package of the embedded Bar classes is changed from com.bar to com.foo.bar. By doing so, Qux can safely depends on Bar:2.0 because now Foo is no longer depending on Bar, and it is using is own copy of “altered” Bar located in another package.

In my example, Spark Cassandra Connector does not depend on Guava 16.0 instead it takes all Guava Classes from Spark Core Packages. The reason I created is that I went into the below error while using Spark Cassandra Connector directly.

16/05/22 10:59:35 INFO Version: Elasticsearch Hadoop v2.2.0 [c11e37bde9]
Exception in thread "main" java.lang.ExceptionInInitializerError
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.clusterBuilder(CassandraConnectionFactory.scala:36)
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:88)
    at .

.

.
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use.  This introduces codec resolution issues and potentially other incompatibility issues in the driver.  Please upgrade to Guava 16.01 or later.
    at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62)
    at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36)
    at com.datastax.driver.core.Cluster.<clinit>(Cluster.java:67)
    ... 21 more

It looks like Spark 1.6.0 Uses Guava 14.0 (http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.6.0) but Spark Cassandra Connector (in github) uses Guava 16.0 (http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10/1.6.0-M2). That’s the reason for Shade is required here.

I have uploaded Spark Cassandra Guava Shade Jar here. You can use this along with your Spark Dependencies.

References:

http://stackoverflow.com/questions/13620281/what-is-the-maven-shade-plugin-used-for-and-why-would-you-want-to-relocate-java

Hope you enjoyed this tutorial.