diff --git a/README.md b/README.md
index 61e72c6..51d8c52 100644
--- a/README.md
+++ b/README.md
@@ -58,6 +58,23 @@ There are 2 packages with 2 distinct demos
+
+* _usecases_
+
+
+
+ Those scenarios examplify how Spark can be used to achieved various real world use-cases
+
+ - Scenarios
+
+ - **CrossClusterDataMigration** : this is a sample code to show how to perform effective cross cluster operations. **DO NOT EXECUTE IT**
+ - **CrossDCDataMigration** : this is a sample code to show how to perform effective cross data-centers operations. **DO NOT EXECUTE IT**
+ - **DataCleaningForPerformers** : in this scenario, we read data from the `performers` table to clean up empty _country_ and reformatting the _born_ and _died_ dates, if present. The data are saved back into Cassandra, thus achieving **perfect data locality**
+ - **DisplayPerformersData** : an utility class to show data **before** and **after** the cleaning
+ - **MigrateAlbumnsData** : in this scenario, we read source date from `albums` and save them back into a new table `albums_by_country` purposedly built for fast query on contry and year
+
+
+
* _weather.data.demo_
diff --git a/src/main/scala/usecases/CrossClusterDataMigration.scala b/src/main/scala/usecases/CrossClusterDataMigration.scala
new file mode 100644
index 0000000..b1f7403
--- /dev/null
+++ b/src/main/scala/usecases/CrossClusterDataMigration.scala
@@ -0,0 +1,41 @@
+package usecases
+
+import analytics.music.Schema._
+import com.datastax.spark.connector._
+import com.datastax.spark.connector.cql.{CassandraConnectorConf, CassandraConnector}
+import com.datastax.spark.connector.rdd.CassandraTableScanRDD
+import com.datastax.spark.connector.writer.RowWriterFactory
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.matching.Regex
+
+object CrossClusterDataMigration {
+
+ case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])
+
+ // DO NOT EXECUTE THIS CLASS, IT IS ONLY A CODE SAMPLE
+ @deprecated
+ def main (args: Array[String]) {
+
+ val confCluster1 = new SparkConf(true)
+ .setAppName("cross_cluster_data_migration")
+ .setMaster("master_ip")
+ .set("spark.cassandra.connection.host", "cluster_1_hostnames")
+
+ val confCluster2 = new SparkConf(true)
+ .setAppName("data_migration")
+ .setMaster("master_ip")
+ .set("spark.cassandra.connection.host", "cluster_2_hostnames")
+
+
+ val sc = new SparkContext(confCluster1)
+
+ sc.cassandraTable[Performer](KEYSPACE,PERFORMERS)
+ .map[Performer](???)
+ .saveToCassandra(KEYSPACE,PERFORMERS)
+ (CassandraConnector(confCluster2),implicitly[RowWriterFactory[Performer]])
+
+ }
+
+ }
diff --git a/src/main/scala/usecases/CrossDCDataMigration.scala b/src/main/scala/usecases/CrossDCDataMigration.scala
new file mode 100644
index 0000000..e7dd35f
--- /dev/null
+++ b/src/main/scala/usecases/CrossDCDataMigration.scala
@@ -0,0 +1,39 @@
+package usecases
+
+import analytics.music.Schema._
+import com.datastax.spark.connector._
+import com.datastax.spark.connector.cql.CassandraConnector
+import com.datastax.spark.connector.writer.RowWriterFactory
+import org.apache.spark.{SparkConf, SparkContext}
+
+object CrossDCDataMigration {
+
+ case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])
+
+ // DO NOT EXECUTE THIS CLASS, IT IS ONLY A CODE SAMPLE
+ @deprecated
+ def main (args: Array[String]) {
+
+ val confDC1 = new SparkConf(true)
+ .setAppName("cross_DC_data_migration")
+ .setMaster("master_ip")
+ .set("spark.cassandra.connection.host", "DC_1_hostnames")
+ .set("spark.cassandra.connection.local_dc", "DC_1")
+
+ val confDC2 = new SparkConf(true)
+ .setAppName("data_migration")
+ .setMaster("master_ip")
+ .set("spark.cassandra.connection.host", "DC_2_hostnames")
+ .set("spark.cassandra.connection.local_dc", "DC_2")
+
+
+ val sc = new SparkContext(confDC1)
+
+ sc.cassandraTable[Performer](KEYSPACE,PERFORMERS)
+ .map[Performer](???)
+ .saveToCassandra(KEYSPACE,PERFORMERS)
+ (CassandraConnector(confDC2),implicitly[RowWriterFactory[Performer]])
+
+ }
+
+ }
diff --git a/src/main/scala/usecases/DataCleaningForPerformers.scala b/src/main/scala/usecases/DataCleaningForPerformers.scala
new file mode 100644
index 0000000..7fe3480
--- /dev/null
+++ b/src/main/scala/usecases/DataCleaningForPerformers.scala
@@ -0,0 +1,88 @@
+package usecases
+
+import com.datastax.spark.connector._
+import com.datastax.spark.connector.rdd.CassandraTableScanRDD
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.{SparkConf, SparkContext}
+import analytics.music.Schema._
+
+import scala.util.matching.Regex
+
+object DataCleaningForPerformers {
+
+ /**
+ *
+ * CREATE TABLE IF NOT EXISTS performers (
+ * name TEXT,
+ * country TEXT,
+ * gender TEXT,
+ * type TEXT,
+ * born TEXT,
+ * died TEXT,
+ * styles LIST,
+ * PRIMARY KEY (name)
+ * );
+ */
+ val YearPattern = "^[0-9]{4}$".r
+ val YearMonthPattern = "^[0-9]{4}-[0-9]{2}$".r
+ case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])
+
+ def main (args: Array[String]) {
+
+ val conf = new SparkConf(true)
+ .setAppName("data_cleaning")
+ .setMaster("local")
+ .set("spark.cassandra.connection.host", "localhost")
+
+ val sc = new SparkContext(conf)
+
+ println("\n ------------ Cleaning data ----------------- \n")
+
+ val cache: CassandraTableScanRDD[Performer] = sc
+ .cassandraTable[Performer](KEYSPACE, PERFORMERS).cache()
+
+ //Fix blank country
+ cache
+ .filter(bean => StringUtils.isBlank(bean.country))
+ .map(bean => (bean.name,"Unknown"))
+ /**
+ * Equivalent to INSERT INTO performers(name,country) VALUES(xxx,yyy)
+ * equivalent to UPDATE performers SET country = yyy WHERE name=xxx
+ */
+ .saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","country"))
+
+ //Normalize born date
+ cache
+ .filter(bean => matchPattern(bean.born,YearPattern))
+ .map(bean => (bean.name,bean.born+"-01-01"))
+ .saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","born"))
+
+ cache
+ .filter(bean => matchPattern(bean.born,YearMonthPattern))
+ .map(bean => (bean.name,bean.born+"-01"))
+ .saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","born"))
+
+ //Normalize died date
+ cache
+ .filter(bean => matchPattern(bean.died,YearPattern))
+ .map(bean => (bean.name,bean.died+"-01-01"))
+ .saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","died"))
+
+ cache
+ .filter(bean => matchPattern(bean.died,YearMonthPattern))
+ .map(bean => (bean.name,bean.died+"-01"))
+ .saveToCassandra(KEYSPACE, PERFORMERS,SomeColumns("name","died"))
+
+ sc.stop()
+ }
+
+ private def matchPattern(input:String, pattern: Regex):Boolean = {
+ if(StringUtils.isBlank(input)) return false
+
+ pattern findFirstIn input match {
+ case Some(c) => true
+ case None => false
+ }
+ }
+
+}
diff --git a/src/main/scala/usecases/DisplayPerformersData.scala b/src/main/scala/usecases/DisplayPerformersData.scala
new file mode 100644
index 0000000..419dc85
--- /dev/null
+++ b/src/main/scala/usecases/DisplayPerformersData.scala
@@ -0,0 +1,49 @@
+package usecases
+
+import analytics.music.Schema._
+import com.datastax.spark.connector._
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+object DisplayPerformersData {
+
+ /**
+ *
+ * CREATE TABLE IF NOT EXISTS performers (
+ * name TEXT,
+ * country TEXT,
+ * gender TEXT,
+ * type TEXT,
+ * born TEXT,
+ * died TEXT,
+ * styles LIST,
+ * PRIMARY KEY (name)
+ * );
+ */
+
+ case class Performer(name: String, country: String, gender:String, `type`:String,born:String,died:String, styles:Set[String])
+
+ def main (args: Array[String]) {
+
+ val conf = new SparkConf(true)
+ .setAppName("data_display")
+ .setMaster("local")
+ .set("spark.cassandra.connection.host", "localhost")
+
+ val sc = new SparkContext(conf)
+
+
+
+ sc.cassandraTable[Performer](KEYSPACE, PERFORMERS)
+ .filter(bean => StringUtils.isNotBlank(bean.died))
+ .map(bean => bean.died)
+ .collect()
+ .sorted
+ .toSet
+ .foreach[Unit](x => println(x))
+
+ sc.stop()
+ }
+
+
+ }
diff --git a/src/main/scala/usecases/MigrateAlbumsData.scala b/src/main/scala/usecases/MigrateAlbumsData.scala
new file mode 100644
index 0000000..01e85e2
--- /dev/null
+++ b/src/main/scala/usecases/MigrateAlbumsData.scala
@@ -0,0 +1,64 @@
+package usecases
+
+import analytics.music.Schema._
+import com.datastax.spark.connector._
+import com.datastax.spark.connector.cql.CassandraConnector
+import org.apache.spark.{SparkConf, SparkContext}
+
+object MigrateAlbumsData {
+
+ /**
+ *
+ * CREATE TABLE spark_demo.albums (
+ * title text PRIMARY KEY,
+ * country text,
+ * performer text,
+ * quality text,
+ * status text,
+ * year int
+ * )
+ *
+ * CREATE TABLE spark_demo.albums_by_country (
+ * title text,
+ * country text,
+ * performer text,
+ * quality text,
+ * status text,
+ * year int,
+ * PRIMARY KEY((country,year),title)
+ * )
+ */
+
+ case class Albums(title: String, country: String, performer:String, quality:String,status:String,year:Int)
+
+ def main (args: Array[String]) {
+
+ val conf = new SparkConf(true)
+ .setAppName("data_migration")
+ .setMaster("local")
+ .set("spark.cassandra.connection.host", "localhost")
+
+ val sc = new SparkContext(conf)
+
+ CassandraConnector(conf).withSessionDo {
+ session => session.execute(
+ """
+ CREATE TABLE IF NOT EXISTS spark_demo.albums_by_country (
+ title text,
+ country text,
+ performer text,
+ quality text,
+ status text,
+ year int,
+ PRIMARY KEY((country,year),title)
+ )
+ """.stripMargin)
+ }
+
+ sc.cassandraTable[Albums](KEYSPACE, ALBUMS)
+ .saveToCassandra(KEYSPACE,ALBUMS_BY_COUNTRY)
+ sc.stop()
+ }
+
+
+ }