Spark Based Data Fountain Advanced Analytics Framework [or] How to Connect to RDBMS DataSources through Spark DataFrame/JDBC APIs

Today I wanted to try some interesting use case to do some analytics on the raw feed of a table from a oracle database.  My thought process is always navigating to Spark way of connecting the systems instead of Sqoop or Hadoop way.  I have given some of the benefits I realized after creating this use case.

I took the Use case goals as below.

  1. Creating a Modern Ultra Speed Data Lake
  2. Work on the Raw Data without the dependency of the Data Lake
  3. Fetch a policyid, policynumber from a policy table (Oracle)
  4. Fetch is based on Entire Table and a Specific SQL query.
  5. Do some ETL
  6. Write/Insert a Seperate Result Set into the Oracle Table.


Am going to call this as Spark Based Data Fountain Advanced Analytics Framework 🙂


  • Blue Arrow –> Represents –> Data Flow with No ETL/Raw Feed work
  • Yellow Arrow –> Represents –> Data Flow with ETL Work
  • Data Extraction Layer –> Gives you the ability to extract One time/Delta Feed from Sources through Spark JDBC APIs
  • Persistency Layer –> Gives you the ability to store Raw/ETLed data into the SQL Based or No SQL Based Data Lake
  • Application Layer –> You can build your data analytics applications with the help of Raw Data Directly accessing from Sources via Spark Data Frame APIs in Memory. You don’t need to depend on the Storage Layer anymore.
  • Visualization Layer — > Gives you the ability to plot, graph, project your data on various visualization tools
  • With this, we can also replicate the Data Lake data to other Clusters in the Organization through Spark Streaming or Spark DataFrame APIs (I haven’t explained here)

Approach Used:

  1. I have included ojdbc7.jar dependency first in my Settings of IntelliJ Idea.
  2. Connect to Oracle DB through proper Server URL,username,pwd through SqlContext of Spark API
  3. Filter Few Records of the table in two seperate Spark DataFrames
  4. Join two Dataframes through Join method.
  5. Produce Output in a Hive Table or Elastic Search (I have not given this implementation here)
  6. Write some random Collection Output to a test Oracle table.
  7. Connect Mysql Database and get the data into the Spark Dataframe (I have not given this implementation here)

Full Program:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

* Created by Varatharajan Giri Ramanathan on 2/24/2016.
object SparkJdbcConnect {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf().setAppName(“SparkConnectJDBC”)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val prop = new java.util.Properties
val oracleJdbcUrl = s”jdbc:oracle:thin:username/password@localhost:1521/schemaname”
val mysqlJdbcurl=”jdbc:mysql://localhost:3306/hadoopdb”

//Include ojdbc7.jar in the Dependency Window.
//Below Example –> The data will be loaded into 10 partitions which means first partition will have records whose primary keys are within 1 to roughly 2000. And for each round trip, this will fetch 10 record at a time.
def oracleJdbc(url:String, sqlContext: SQLContext) : Unit =
val pasTable = s”pasadm.policysummary”
val pasQuery = “(select ID,POLICYNUMBER from dbname.testpolicytable where rownum <=100)”
val tablePart = s”ID”
val policySummary =“jdbc”)
Map(“url” -> oracleJdbcUrl,
“dbtable” -> pasQuery, //Query Based Data Retrieval
// “dbtable” -> pasTable, //Entire Table Based Data Retrieval
“partitionColumn” -> tablePart,
“lowerBound” -> “1”,
“upperBound” -> “2000”,
“numPartitions” -> “10”,
“fetchSize” -> “100”)).load()

//Below is another way to read Oracle through Spark JDBC
//val policySummary =,”dbname.tablename”,prop)

//Doing some ETL
//val p1 = policySummary.where(policySummary(“POLICYNUMBER”)===”POL000001″)
//val p2 = policySummary.where(policySummary(“POLICYNUMBER”)===”POL000002″)
//val joined = p1.join(p2,p1(“POLICYNUMBER”)<=>p2(“POLICYNUMBER”)).foreach(println)
/*Writing the Output Into into Oracle
case class Conf(id: String, policynumber: String)
val data = sc.parallelize( Seq(Conf(“1”, “AZPOL123456”), Conf(“2”, “CAPOL123456”), Conf(“3”, “NCPOL123456”), Conf(“4”, “INPOL123456″) ))
import sqlContext.implicits._
val df = data.toDF()
def mysqlJdbc(url:String,sQLContext: SQLContext) : Unit = {




Benefits I realized and yet to Explore the Performance:

  • Simple Architecture and Approach i.e..DataSources –> Spark JDBC/ODBC –> Spark DataFrames,RDDs,ML Pipelines,Graphx, –> Visualizations (ELK,Tableau,Cognos,etc)
  • Direct Access to Data Sources in short span of time
  • Utilization of Advanced Analytics Capabilities using Spark Advanced Data Frame APIs and Machine Learning Libraries by accessing data sources directly.
  • Schema-on-write ETLs
  • Solve Different Data problems on the fly including Spark Advanced Analytics Libraries in Mlibs, GraphX, Streaming APIs, etc
  • Am assuming 1000 Oracle Tables can easily be loaded into the Sandbox Cluster within 1hr or so based on the Text or Parquet File Formats.
  • Bindings are available in Python, Scala , Java, R
  • Am realizing Sqoop Queries Not Required to build the data Lake.
  • Spark Tungsten Optimizer does all the hard work in terms of Optimization.
  • Less Code & Faster Implementations.
  • Faster Dynamic Partitions of the data.
  • Funny thing is without hitting the Persistent storage of data lake (Hive based), we can build any applications through Spark JDBC and DataFrame APIs in short span of time.

So why are you waiting, go ahead with building a Spark based Data Lake.


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s