Skip to content

Commit

Permalink
Add examples of analytics pipelines using music data
Browse files Browse the repository at this point in the history
  • Loading branch information
doanduyhai committed Apr 19, 2015
1 parent f4d5ef3 commit c70a41a
Show file tree
Hide file tree
Showing 12 changed files with 124,424 additions and 0 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ There are 2 packages with 2 distinct demos
If you don't have a Twitter app credentials, create a new apps at <a href="https://apps.twitter.com/" target="_blank">https://apps.twitter.com/</a>
</li>
</ul>

* _analytics.music_
<ul>
<li> Data preparation
<ol>
<li> Go to the folder main/data</li>
<li> Execute <em>$CASSANDRA_HOME/bin/cqlsh -f music.cql</em> from this folder. It should create the keyspace <strong>spark_demo</strong> and some tables </li>
<li> the script will then load into Cassandra the content of <em>performers.csv</em> and <em>albums.csv</em></li>
</ol>
</li>
<li> Scenarios
<br/>
<br/>
All examples extend the `BaseExample` class which configures a SparkContext and truncate some tables automatically for you
so that the example can be executed several times and be consistent
<br/>
<ol>
<li> **Example1** : in this example, we read data from the `performers` table to extract performers and styles into the `performers_by_style` table</li>
<li> **Example2** : in this example, we read data from the `performers` table, group styles by performer for aggregation. The results are saved back into the `performers_distribution_by_style` table</li>
<li> **Example3** : similar to **Example2** we only want to extract the top 10 styles for **artists** and **groups** and save the results into the `top10_styles` table</li>
<li> **Example4** : in this example, we want to know, for each decade, the number of albums released by each artist, group by their origin country. For this we join the table `performers` with `albums`. The results are saved back into the `albums_by_decade_and_country` table</li>
<li> **Example5** : similar to **Example4**, we perform the join using the **SparkSQL** language. We also filter out low release count countries. The results are saved back into the `albums_by_decade_and_country_sql` table</li>
</ol>
</li>
</ul>

* _weather.data.demo_
<ul>
Expand Down
112,835 changes: 112,835 additions & 0 deletions src/main/data/albums.csv

Large diffs are not rendered by default.

71 changes: 71 additions & 0 deletions src/main/data/music.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
CREATE KEYSPACE IF NOT EXISTS spark_demo
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor' : 1
};

USE spark_demo;

CREATE TABLE IF NOT EXISTS performers (
name TEXT,
country TEXT,
gender TEXT,
type TEXT,
born TEXT,
died TEXT,
styles LIST<TEXT>,
PRIMARY KEY (name)
);

CREATE TABLE IF NOT EXISTS albums (
performer TEXT,
title TEXT,
year INT,
country TEXT,
quality TEXT,
status TEXT,
PRIMARY KEY(title)
);

CREATE TABLE IF NOT EXISTS performers_by_style (
style TEXT,
performer TEXT,
PRIMARY KEY (style, performer)
);

CREATE TABLE IF NOT EXISTS performers_distribution_by_style (
type text,
style text,
count int,
PRIMARY KEY (type, style)
);

CREATE TABLE IF NOT EXISTS top10_styles (
type text,
count int,
style text,
PRIMARY KEY (type, count)
) WITH CLUSTERING ORDER BY (count DESC);

CREATE TABLE IF NOT EXISTS albums_by_decade_and_country(
country text,
decade text,
album_count int,
PRIMARY KEY(decade,country)
);

CREATE TABLE IF NOT EXISTS albums_by_decade_and_country_sql(
country text,
decade text,
album_count int,
PRIMARY KEY(decade,country)
);

TRUNCATE performers;
TRUNCATE albums;

COPY performers(name, country, gender, type, born, died,styles)
FROM 'performers.csv';

COPY albums(performer,title, year, country, quality, status)
FROM 'albums.csv';
11,100 changes: 11,100 additions & 0 deletions src/main/data/performers.csv

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions src/main/scala/analytics/music/BaseExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package analytics.music

import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkConf, SparkContext}
import analytics.music.Schema._
import analytics.music.Constants._

trait BaseExample {

def buildSparkContext(exerciseName: String):SparkContext = {

val conf = new SparkConf(true)
.setAppName(exerciseName)
.setMaster(LOCAL_MODE)
.set(CASSANDRA_HOST_NAME_PARAM, CASSANDRA_IP)


if (Constants.TABLES.contains(exerciseName)) {
CassandraConnector(conf).withSessionDo {
session => session.execute(s"TRUNCATE ${KEYSPACE}.${Constants.TABLES(exerciseName)}")
}
}

new SparkContext(conf)
}
}
23 changes: 23 additions & 0 deletions src/main/scala/analytics/music/Constants.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package analytics.music

import analytics.music.Schema._

object Constants {

val CASSANDRA_HOST_NAME_PARAM = "spark.cassandra.connection.host"
val CASSANDRA_IP = "localhost"
val LOCAL_MODE = "local"

val EXAMPLE_1 = "example1"
val EXAMPLE_2 = "example2"
val EXAMPLE_3 = "example3"
val EXAMPLE_4 = "example4"
val EXAMPLE_5 = "example5"
val INTERACTIVE_ANALYTICS_WITH_DSE = "interactive_analytics_with_DSE"

val TABLES = Map( EXAMPLE_1 -> PERFORMERS_BY_STYLE,
EXAMPLE_2 -> PERFORMERS_DISTRIBUTION_BY_STYLE,
EXAMPLE_3 -> TOP_10_STYLES,
EXAMPLE_4 -> ALBUMS_BY_DECADE_AND_COUNTRY,
EXAMPLE_5 -> ALBUMS_BY_DECADE_AND_COUNTRY_SQL)
}
42 changes: 42 additions & 0 deletions src/main/scala/analytics/music/Example1.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package analytics.music

import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.CassandraRDD
import org.apache.spark.rdd.RDD
import analytics.music.Constants._
import analytics.music.Schema._

object Example1 extends BaseExample {

def main (args: Array[String]) {


val sc = buildSparkContext(EXAMPLE_1)

/*
* performers table structure
*
* CREATE TABLE IF NOT EXISTS performers (
* name TEXT,
* country TEXT,
* gender TEXT,
* type TEXT,
* born TEXT,
* died TEXT,
* styles LIST<TEXT>,
* PRIMARY KEY (name)
* );
*/
val rdd:CassandraRDD[CassandraRow] = sc.cassandraTable(KEYSPACE, PERFORMERS)

val performerAndStyles:RDD[(String,String)] =
rdd
.map[(String,List[String])](row => (row.getString("name"),row.getList[String]("styles").toList))
.flatMap[(String,String)]{ case (performer,styles) => styles.map(style => (style,performer))}

//Save data back to Cassandra
performerAndStyles.saveToCassandra(KEYSPACE,PERFORMERS_BY_STYLE,SomeColumns("style","performer"))

sc.stop()
}
}
59 changes: 59 additions & 0 deletions src/main/scala/analytics/music/Example2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package analytics.music

import com.datastax.spark.connector._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import analytics.music.Constants._
import analytics.music.Schema._

object Example2 extends BaseExample {

def main (args: Array[String]) {

val sc = buildSparkContext(EXAMPLE_2)

/*
* Read columns "type" and "styles" from table 'performers'
* and save them as (String,List[String]) RDDs
* using the .as((_:String,_:List[String])) type conversion function
* Normalize the performer type by filtering out 'Unknown' types
*/
val rdd:RDD[(String,List[String])] = sc.cassandraTable(KEYSPACE, PERFORMERS)
.select("type","styles")
.as((_:String,_:List[String]))
.map{case(performer_type,style) => (PERFORMERS_TYPES.getOrElse(performer_type,"Unknown"),style)}
.filter{case(performer_type,_) => performer_type != "Unknown"}


/*
* Transform the previous tuple RDDs into a key/value RDD (PairRDD) of type
* ((String,String),Integer). The (String,String) pair is the key(performer type,style)
* The Integer value should be set to 1 for each element of the RDD
*/
val pairs:RDD[((String,String),Int)] = rdd.flatMap{
case(performer_type,styles)=>styles.map(style => ((performer_type,style),1))
}

/*
* Reduce the previous tuple of ((performer type,style),1) by
* adding up all the 1's into a ((performer type,style),count)
*/
val reduced: RDD[((String, String), Int)] = pairs.reduceByKey{ case(left,right) => left+right}

/*
* Flatten the ((performer type,style),count) into
* (performer type,style,count)
*/
val aggregated:RDD[(String,String,Int)] = reduced.map{
case((performer_type,style),count) => (performer_type,style,count)
}


//Save data back to the performers_distribution_by_style table
aggregated.saveToCassandra(KEYSPACE, PERFORMERS_DISTRIBUTION_BY_STYLE, SomeColumns("type","style","count"))

sc.stop()
}

val PERFORMERS_TYPES = Map("Person"->"artist","Group"->"group")
}
78 changes: 78 additions & 0 deletions src/main/scala/analytics/music/Example3.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package analytics.music

import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.CassandraRDD
import org.apache.spark.rdd.RDD
import analytics.music.Constants._
import analytics.music.Schema._

object Example3 extends BaseExample {

def main (args: Array[String]) {


val sc = buildSparkContext(EXAMPLE_3)

/*
* Read data from 'performers_distribution_by_style' table
*/
val artists:CassandraRDD[(String,String,Int)] = sc
.cassandraTable(KEYSPACE, PERFORMERS_DISTRIBUTION_BY_STYLE)
.select("type","style","count")
.as((_:String,_:String,_:Int))

val sortedPerformers:RDD[(String,String,Int)] = artists
.filter {case (_,style,_) => style != "Unknown"}
.sortBy[Int](tuple => tuple._3,false,1)

// Put the sorted RDD in cache for re-use
sortedPerformers.cache()

// Extract styles for groups
val groupsStyles: RDD[(String, String, Int)] = sortedPerformers
.filter {case(performer_type,_,_) => performer_type == "group"}

// Extract styles for artists
val artistStyles: RDD[(String, String, Int)] = sortedPerformers
.filter {case(performer_type,_,_) => performer_type == "artist"}

// Cache the groupStyles
groupsStyles.cache()

// Cache the artistStyles
artistStyles.cache()

// Count total number of artists having styles that are not in the top 10
val otherStylesCountForGroup:Int = groupsStyles
.collect() //Fetch the whole RDD back to driver program
.drop(10) //Drop the first 10 top styles
.map{case(_,_,count)=>count} //Extract the count
.sum //Sum up the count

// Count total number of groups having styles that are not in the top 10
val otherStylesCountForArtist:Int = artistStyles
.collect() //Fetch the whole RDD back to driver program
.drop(10) //Drop the first 10 top styles
.map{case(_,_,count)=>count} //Extract the count
.sum //Sum up the count


// Take the top 10 styles for groups, with a count for all other styles
val top10Groups = groupsStyles.take(10) :+ ("group","Others",otherStylesCountForGroup)

// Take the top 10 styles for artists, with a count for all other styles
val top10Artists = artistStyles.take(10) :+ ("artist","Others",otherStylesCountForArtist)

/*
* Remark: by calling take(n), all the data are shipped back to the driver program
* the output of take(n) is no longer an RDD but a simple Scala collection
*/

// Merge both list and save back to Cassandra
sc.parallelize(top10Artists.toList ::: top10Groups.toList)
.saveToCassandra(KEYSPACE,TOP_10_STYLES,SomeColumns("type","style","count"))


sc.stop()
}
}
Loading

0 comments on commit c70a41a

Please sign in to comment.