Skip to content

Commit

Permalink
Add use cases scenarios for data migration and data cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
doanduyhai committed Apr 19, 2015
1 parent c70a41a commit 36923c4
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 0 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ There are 2 packages with 2 distinct demos
</ol>
</li>
</ul>

* _usecases_
<ul>
<br/>
<br/>
Those scenarios examplify how Spark can be used to achieved various real world use-cases
<br/>
<li> Scenarios
<ol>
<li> **CrossClusterDataMigration** : this is a sample code to show how to perform effective cross cluster operations. **DO NOT EXECUTE IT**</li>
<li> **CrossDCDataMigration** : this is a sample code to show how to perform effective cross data-centers operations. **DO NOT EXECUTE IT**</li>
<li> **DataCleaningForPerformers** : in this scenario, we read data from the `performers` table to clean up empty _country_ and reformatting the _born_ and _died_ dates, if present. The data are saved back into Cassandra, thus achieving **perfect data locality**</li>
<li> **DisplayPerformersData** : an utility class to show data **before** and **after** the cleaning</li>
<li> **MigrateAlbumnsData** : in this scenario, we read source date from `albums` and save them back into a new table `albums_by_country` purposedly built for fast query on contry and year</li>
</ol>
</li>
</ul>

* _weather.data.demo_
<ul>
Expand Down
41 changes: 41 additions & 0 deletions src/main/scala/usecases/CrossClusterDataMigration.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package usecases

import analytics.music.Schema._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.{CassandraConnectorConf, CassandraConnector}
import com.datastax.spark.connector.rdd.CassandraTableScanRDD
import com.datastax.spark.connector.writer.RowWriterFactory
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.matching.Regex

object CrossClusterDataMigration {

case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])

// DO NOT EXECUTE THIS CLASS, IT IS ONLY A CODE SAMPLE
@deprecated
def main (args: Array[String]) {

val confCluster1 = new SparkConf(true)
.setAppName("cross_cluster_data_migration")
.setMaster("master_ip")
.set("spark.cassandra.connection.host", "cluster_1_hostnames")

val confCluster2 = new SparkConf(true)
.setAppName("data_migration")
.setMaster("master_ip")
.set("spark.cassandra.connection.host", "cluster_2_hostnames")


val sc = new SparkContext(confCluster1)

sc.cassandraTable[Performer](KEYSPACE,PERFORMERS)
.map[Performer](???)
.saveToCassandra(KEYSPACE,PERFORMERS)
(CassandraConnector(confCluster2),implicitly[RowWriterFactory[Performer]])

}

}
39 changes: 39 additions & 0 deletions src/main/scala/usecases/CrossDCDataMigration.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package usecases

import analytics.music.Schema._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.writer.RowWriterFactory
import org.apache.spark.{SparkConf, SparkContext}

object CrossDCDataMigration {

case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])

// DO NOT EXECUTE THIS CLASS, IT IS ONLY A CODE SAMPLE
@deprecated
def main (args: Array[String]) {

val confDC1 = new SparkConf(true)
.setAppName("cross_DC_data_migration")
.setMaster("master_ip")
.set("spark.cassandra.connection.host", "DC_1_hostnames")
.set("spark.cassandra.connection.local_dc", "DC_1")

val confDC2 = new SparkConf(true)
.setAppName("data_migration")
.setMaster("master_ip")
.set("spark.cassandra.connection.host", "DC_2_hostnames")
.set("spark.cassandra.connection.local_dc", "DC_2")


val sc = new SparkContext(confDC1)

sc.cassandraTable[Performer](KEYSPACE,PERFORMERS)
.map[Performer](???)
.saveToCassandra(KEYSPACE,PERFORMERS)
(CassandraConnector(confDC2),implicitly[RowWriterFactory[Performer]])

}

}
88 changes: 88 additions & 0 deletions src/main/scala/usecases/DataCleaningForPerformers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package usecases

import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.CassandraTableScanRDD
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import analytics.music.Schema._

import scala.util.matching.Regex

object DataCleaningForPerformers {

/**
*
* CREATE TABLE IF NOT EXISTS performers (
* name TEXT,
* country TEXT,
* gender TEXT,
* type TEXT,
* born TEXT,
* died TEXT,
* styles LIST<TEXT>,
* PRIMARY KEY (name)
* );
*/
val YearPattern = "^[0-9]{4}$".r
val YearMonthPattern = "^[0-9]{4}-[0-9]{2}$".r
case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])

def main (args: Array[String]) {

val conf = new SparkConf(true)
.setAppName("data_cleaning")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)

println("\n ------------ Cleaning data ----------------- \n")

val cache: CassandraTableScanRDD[Performer] = sc
.cassandraTable[Performer](KEYSPACE, PERFORMERS).cache()

//Fix blank country
cache
.filter(bean => StringUtils.isBlank(bean.country))
.map(bean => (bean.name,"Unknown"))
/**
* Equivalent to INSERT INTO performers(name,country) VALUES(xxx,yyy)
* equivalent to UPDATE performers SET country = yyy WHERE name=xxx
*/
.saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","country"))

//Normalize born date
cache
.filter(bean => matchPattern(bean.born,YearPattern))
.map(bean => (bean.name,bean.born+"-01-01"))
.saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","born"))

cache
.filter(bean => matchPattern(bean.born,YearMonthPattern))
.map(bean => (bean.name,bean.born+"-01"))
.saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","born"))

//Normalize died date
cache
.filter(bean => matchPattern(bean.died,YearPattern))
.map(bean => (bean.name,bean.died+"-01-01"))
.saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","died"))

cache
.filter(bean => matchPattern(bean.died,YearMonthPattern))
.map(bean => (bean.name,bean.died+"-01"))
.saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","died"))

sc.stop()
}

private def matchPattern(input:String, pattern: Regex):Boolean = {
if(StringUtils.isBlank(input)) return false

pattern findFirstIn input match {
case Some(c) => true
case None => false
}
}

}
49 changes: 49 additions & 0 deletions src/main/scala/usecases/DisplayPerformersData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package usecases

import analytics.music.Schema._
import com.datastax.spark.connector._
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}

object DisplayPerformersData {

/**
*
* CREATE TABLE IF NOT EXISTS performers (
* name TEXT,
* country TEXT,
* gender TEXT,
* type TEXT,
* born TEXT,
* died TEXT,
* styles LIST<TEXT>,
* PRIMARY KEY (name)
* );
*/

case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])

def main (args: Array[String]) {

val conf = new SparkConf(true)
.setAppName("data_display")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)



sc.cassandraTable[Performer](KEYSPACE, PERFORMERS)
.filter(bean => StringUtils.isNotBlank(bean.died))
.map(bean => bean.died)
.collect()
.sorted
.toSet
.foreach[Unit](x => println(x))

sc.stop()
}


}
64 changes: 64 additions & 0 deletions src/main/scala/usecases/MigrateAlbumsData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package usecases

import analytics.music.Schema._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkConf, SparkContext}

object MigrateAlbumsData {

/**
*
* CREATE TABLE spark_demo.albums (
* title text PRIMARY KEY,
* country text,
* performer text,
* quality text,
* status text,
* year int
* )
*
* CREATE TABLE spark_demo.albums_by_country (
* title text,
* country text,
* performer text,
* quality text,
* status text,
* year int,
* PRIMARY KEY((country,year),title)
* )
*/

case class Albums(title: String, country: String, performer:String, quality:String,status:String,year:Int)

def main (args: Array[String]) {

val conf = new SparkConf(true)
.setAppName("data_migration")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)

CassandraConnector(conf).withSessionDo {
session => session.execute(
"""
CREATE TABLE IF NOT EXISTS spark_demo.albums_by_country (
title text,
country text,
performer text,
quality text,
status text,
year int,
PRIMARY KEY((country,year),title)
)
""".stripMargin)
}

sc.cassandraTable[Albums](KEYSPACE, ALBUMS)
.saveToCassandra(KEYSPACE,ALBUMS_BY_COUNTRY)
sc.stop()
}


}

0 comments on commit 36923c4

Please sign in to comment.