How to Connect Cassandra and Spark with Guava Shade dependency using SBT

What is Cassandra?

Cassandra is a distributed database for managing large amounts of structured data across many commodity servers, while providing highly available service and no single point of failure.  Cassandra offers capabilities that relational databases and other NoSQL databases simply cannot match such as: continuous availability, linear scale performance, operational simplicity and easy data distribution across multiple data centers and cloud availability zones.
Why Cassandra?
Do you need a more flexible data model than what’s offered in the relational database world? Would you like to start with a database you know can scale to meet any number of concurrent user connections and/or data volume size and run blazingly fast? Have you been needing a database that has no single point of failure and one that can easily distribute data among multiple geographies, data centers, and the cloud? Well, that’s Cassandra.

Installing in MacOS:

$ brew install cassandra

$ cd /usr/local/Cellar/cassandra/3.5/bin

$ ./cqlsh

Screen Shot 2016-05-21 at 11.26.16 PM

Let’s connect this table from Spark and read as RDD and DataFrame:

import com.datastax.spark.connector._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by vgiridatabricks on 5/21/16.
 */
 object SparkCassandra {
 def main(args: Array[String]): Unit = {
 val conf = new SparkConf(true)
 .set("spark.cassandra.connection.host", "127.0.0.1")
 // .set("spark.cassandra.auth.username", "username")
 // .set("spark.cassandra.auth.password", "password")
 .setMaster("local").setAppName("SparkCassandra")
 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)

//Just test Spark Cassandra Connectivity
 val tableConn = sc.cassandraTable("sparktestcassandra", "emp")
 print(tableConn)

//Read from Cassandra Table as a DataFrame
 val df = sqlContext
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" -> "emp", "keyspace" -> "sparktestcassandra"))
 .load()
 df.show()
 }
 }

df.show() — > See the output in IntelliJ

Screen Shot 2016-05-21 at 11.28.15 PM

Writing RDD to Cassandra:

Just create a table to get the data saved in a Cassandra Table.

cqlsh:sparktestcassandra> create table wordcount (word varchar primary key, count int);

//Write to Cassandra Table
 val sampleRdd = sc.parallelize(Array(("Databricks", 101), ("Spark", 110), ("Big Data", 10),("Hadoop",40),("Falcon",50)))
 sampleRdd.saveToCassandra("sparktestcassandra", "wordcount", SomeColumns("word", "count"))

Screen Shot 2016-05-21 at 11.40.36 PM

You can see the data is inserted into the Cassandra table “wordcount

cqlsh:sparktestcassandra> select * from wordcount;

Screen Shot 2016-05-21 at 11.42.02 PM

How I created a Spark Cassandra Shaded Jar:

I have used the Spark Cassandra Connector available in github with little modification in guava by having shaded dependency.  Please see how I built shade for guava.

  1. Please check out the spark cassandra github code and clone it in your machine local directory
  2. Add this in project/assembly.sbt `addSbtPlugin(“com.eed3si9n” % “sbt-assembly” % “0.14.3”)`
  3. Add sbt-assembly jar as a separate dependency library so that you can create Assembly shade in build.sbt as below.
  4. Make the below changes in build.sbt file
  5. import sbt.Keys._
    
     import sbt._
    
     //import sbtassembly.AssemblyKeys._
    
     import sbtassembly.AssemblyPlugin.autoImport._
    
     version := "0.1-SNAPSHOT"
    
     scalaVersion := "2.10.5"
    
     name := "spark-cassandra-guava-shade"scalacOptions := Seq("-deprecation", "-unchecked", "-feature")
    libraryDependencies ++= Seq(
    
       "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2"
    
     )
    // There is a conflict between Guava versions on Cassandra Drive and Hadoop
    
     // Shading Guava Package
    
    assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll)
    
    assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
    
    assemblyMergeStrategy in assembly := {
     case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
     case _ => MergeStrategy.first
     }
  6. Navigate to the Project root directory and do sbt clean assembly so that you will get a new jar with shade dependency of Guava Libraries
  7. Manually include this library as a dependency jar along with your spark dependencies.

Please note this build.sbt creates only the Spark Cassandra Connector with Guava shaded dependency. If you want to use the above Spark Program, you will have to create a new SBT project with Spark dependencies put together in build.sbt and adding the shaded spark cassandra jar as a seperate dependency.

That build.sbt file looks something like this.

import sbt.Keys._
 import sbt._
 //import sbtassembly.AssemblyKeys._
 import sbtassembly.AssemblyPlugin.autoImport._
 version := "0.1-SNAPSHOT"
 organization := "com.giri.test"
 scalaVersion := "2.10.5"
 name := "spark-cassandra-guava-shade"
 scalacOptions := Seq("-deprecation", "-unchecked", "-feature")

libraryDependencies ++= Seq(
 "org.apache.spark" % "spark-core_2.10" % "1.6.1",
 "org.apache.spark" % "spark-sql_2.10" % "1.6.1",
 "org.apache.spark" % "spark-hive_2.10" % "1.6.1",
 "org.apache.spark" % "spark-yarn_2.10" % "1.6.1",
 "com.databricks" % "spark-xml_2.10" % "0.2.0",
 "com.databricks" % "spark-csv_2.10" % "1.4.0",
 "org.apache.spark" % "spark-catalyst_2.10" % "1.6.1",
 "org.apache.spark" % "spark-mllib_2.10" % "1.6.1",
 "com.101tec" % "zkclient" % "0.8",
 "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.2.0",
 "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1",
 "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
 "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.5.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-M2"
 )

// There is a conflict between Guava versions on Cassandra Drive and Hadoop
 // Shading Guava Package
 assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll)

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
 case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
 case _ => MergeStrategy.first
 }

What is Uber Jar and Why Shade Dependency is required in Uber Jar?

An uber-jar is something that take all dependencies, and extract the content of the dependencies and put them with the classes/resources of the project itself, in one big JAR. By having such uber-jar, it is easy for execution, because you will need only one big JAR instead of tons of small JARs to run your app. It also ease distribution in some case. sbt assembly helps you to create a UBER JAR in a Scala Based Project.

Creating uber-jar for ease of deployment is one use case of shade plugin. There are also other common use cases which involve package renaming.

For example, I am developing Foo library, which depends on a specific version (e.g. 1.0) of Bar library. Assuming I cannot make use of other version of Bar lib (because API change, or other technical issues, etc). If I simply declare Bar:1.0 as Foo‘s dependency in Maven, it is possible to fall into a problem: A Qux project is depending on Foo, and also Bar:2.0 (and it cannot use Bar:1.0 because Qux needs to use new feature in Bar:2.0). Here is the dilemma: should Qux use Bar:1.0 (which Qux‘s code will not work) or Bar:2.0 (which Foo‘s code will not work)?

In order to solve this problem, developer of Foo can choose to use shade plugin to rename its usage of Bar, so that all classes in Bar:1.0 jar are embedded in Foo jar, and the package of the embedded Bar classes is changed from com.bar to com.foo.bar. By doing so, Qux can safely depends on Bar:2.0 because now Foo is no longer depending on Bar, and it is using is own copy of “altered” Bar located in another package.

In my example, Spark Cassandra Connector does not depend on Guava 16.0 instead it takes all Guava Classes from Spark Core Packages. The reason I created is that I went into the below error while using Spark Cassandra Connector directly.

16/05/22 10:59:35 INFO Version: Elasticsearch Hadoop v2.2.0 [c11e37bde9]
Exception in thread "main" java.lang.ExceptionInInitializerError
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.clusterBuilder(CassandraConnectionFactory.scala:36)
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:88)
    at .

.

.
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use.  This introduces codec resolution issues and potentially other incompatibility issues in the driver.  Please upgrade to Guava 16.01 or later.
    at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62)
    at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36)
    at com.datastax.driver.core.Cluster.<clinit>(Cluster.java:67)
    ... 21 more

It looks like Spark 1.6.0 Uses Guava 14.0 (http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.6.0) but Spark Cassandra Connector (in github) uses Guava 16.0 (http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10/1.6.0-M2). That’s the reason for Shade is required here.

I have uploaded Spark Cassandra Guava Shade Jar here. You can use this along with your Spark Dependencies.

References:

http://stackoverflow.com/questions/13620281/what-is-the-maven-shade-plugin-used-for-and-why-would-you-want-to-relocate-java

Hope you enjoyed this tutorial.