diff --git a/build.sbt b/build.sbt index 505feeb..2e783d0 100644 --- a/build.sbt +++ b/build.sbt @@ -7,10 +7,11 @@ scalaVersion := "2.10.4" libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % "1.1.0" withSources() withJavadoc(), - "org.apache.spark" %% "spark-sql" % "1.1.0" withSources() withJavadoc(), - "org.apache.spark" %% "spark-streaming-twitter" % "1.1.0" withSources() withJavadoc(), - "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha2" withSources() withJavadoc() + "org.apache.spark" %% "spark-core" % "1.2.0" withSources() withJavadoc(), + "org.apache.spark" %% "spark-sql" % "1.2.0" withSources() withJavadoc(), + "org.apache.spark" %% "spark-streaming-twitter" % "1.2.0" withSources() withJavadoc(), + "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha1" withSources() withJavadoc(), + "com.github.nscala-time" %% "nscala-time" % "1.6.0" withSources() withJavadoc() ) diff --git a/src/main/data/weather_data_schema.cql b/src/main/data/weather_data_schema.cql index 3168fc2..20b2952 100644 --- a/src/main/data/weather_data_schema.cql +++ b/src/main/data/weather_data_schema.cql @@ -27,8 +27,8 @@ CREATE TABLE IF NOT EXISTS weather_station ( ); TRUNCATE weather_station; -CREATE INDEX weather_station_country ON weather_station (country_code); -//COPY weather_station (id, name, country_code, state_code, call_sign, lat, long, elevation) FROM 'stations.csv'; +//CREATE INDEX weather_station_country ON weather_station (country_code); +COPY weather_station (id, name, country_code, state_code, call_sign, lat, long, elevation) FROM 'stations.csv'; /* Raw weather readings from a single station, hourly. diff --git a/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala b/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala index 42112ff..7d71581 100644 --- a/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala +++ b/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala @@ -26,7 +26,7 @@ object FromCSVCaseClassToCassandra { val conf = new SparkConf(true) .setAppName("write_csv_to_cassandra") - .setMaster("local[4]") + .setMaster("local") .set("spark.cassandra.connection.host", "localhost") val sc = new SparkContext(conf) @@ -44,5 +44,7 @@ object FromCSVCaseClassToCassandra { lines(8).toInt, lines(9).toDouble, lines(10).toInt) }} .saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE) + + sc.stop() } } diff --git a/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala b/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala index d5bba69..a56c021 100644 --- a/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala +++ b/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala @@ -46,7 +46,7 @@ object FromCSVToCassandra { val conf = new SparkConf(true) .setAppName("write_csv_to_cassandra") - .setMaster("local[4]") + .setMaster("local") .set("spark.cassandra.connection.host", "localhost") val sc = new SparkContext(conf) @@ -64,8 +64,10 @@ object FromCSVToCassandra { lines(8).toInt, lines(9).toDouble, lines(10).toInt) }} .saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE, SomeColumns(TABLE_COLUMNS:_*)) + + sc.stop() } - } + } diff --git a/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala b/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala index c2a7af1..eb68eb1 100644 --- a/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala +++ b/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala @@ -1,7 +1,9 @@ package us.unemployment.demo import com.datastax.spark.connector._ +import com.datastax.spark.connector.rdd.CassandraRDD import org.apache.spark.{SparkConf, SparkContext} +import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE} object FromCassandraToCaseClass { @@ -9,13 +11,15 @@ object FromCassandraToCaseClass { val conf = new SparkConf(true) .setAppName("read_csv_from_cassandra_into_case_class") - .setMaster("local[4]") + .setMaster("local") .set("spark.cassandra.connection.host", "localhost") val sc = new SparkContext(conf) - sc.cassandraTable[UsUnemployment](UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE) + sc.cassandraTable[UsUnemployment](KEYSPACE, TABLE) .foreach(println(_)) + + sc.stop() } diff --git a/src/main/scala/us/unemployment/demo/FromCassandraToRow.scala b/src/main/scala/us/unemployment/demo/FromCassandraToRow.scala index 58fc98d..c152719 100644 --- a/src/main/scala/us/unemployment/demo/FromCassandraToRow.scala +++ b/src/main/scala/us/unemployment/demo/FromCassandraToRow.scala @@ -1,7 +1,10 @@ package us.unemployment.demo +import org.apache.spark.SparkContext._ import com.datastax.spark.connector._ +import com.datastax.spark.connector.rdd.CassandraRDD import org.apache.spark.{SparkConf, SparkContext} +import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE} object FromCassandraToRow { @@ -9,13 +12,13 @@ object FromCassandraToRow { val conf = new SparkConf(true) .setAppName("read_csv_from_cassandra_into_case_class") - .setMaster("local[4]") + .setMaster("local") .set("spark.cassandra.connection.host", "localhost") val sc = new SparkContext(conf) - sc.cassandraTable(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE) + sc.cassandraTable(KEYSPACE, TABLE) .foreach( row => { val year = row.getInt("year") val unemployedPercentageToLabor = row.getDouble("unemployed_percentage_to_labor") @@ -23,6 +26,16 @@ object FromCassandraToRow { } ) + // or + val unemployment: CassandraRDD[(String, Double)] = sc.cassandraTable(KEYSPACE, TABLE) + .select("year", "unemployed_percentage_to_labor").as((_: String, _: Double)) + println(" ------------Alternative ----------------- ") + + unemployment + .sortByKey() + .collect().foreach{case (year,percentage) => println(s""" Year($year), unemployment % : $percentage""")} + + sc.stop() } } diff --git a/src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala b/src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala new file mode 100644 index 0000000..67955f8 --- /dev/null +++ b/src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala @@ -0,0 +1,33 @@ +package us.unemployment.demo + +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.cassandra.CassandraSQLContext +import org.apache.spark.{SparkConf, SparkContext} +import us.unemployment.demo.UsUnemploymentSchema.{KEYSPACE, TABLE} + +object FromCassandraToSQL { + + def main (args: Array[String]) { + + val conf = new SparkConf(true) + .setAppName("read_csv_from_cassandra_into_case_class") + .setMaster("local") + .set("spark.cassandra.connection.host", "localhost") + + val sc = new SparkContext(conf) + + val cc = new CassandraSQLContext(sc) + + cc.setKeyspace(KEYSPACE) + + val row: SchemaRDD = cc.cassandraSql(s"SELECT year,unemployed_percentage_to_labor " + + s"FROM $TABLE WHERE unemployed_percentage_to_labor > 8 " + + s"ORDER BY year DESC") + + row.collect().foreach(row => println(s" Year(${row(0)}): ${row(1)}%")) + + sc.stop() + } + + +} diff --git a/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala b/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala index 1d003da..2e97d8d 100644 --- a/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala +++ b/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala @@ -1,8 +1,17 @@ package weather.data.demo +import java.util.Date + import com.datastax.spark.connector._ import org.apache.spark.SparkContext._ +import com.github.nscala_time.time.Imports._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.{SparkConf, SparkContext} +import org.joda.time.Seconds +import weather.data.demo.WeatherDataSchema.{WEATHER_DATA_TABLE, WEATHER_STATION_TABLE, KEYSPACE} + +import scala.collection.immutable.TreeMap +import scala.collection.{SortedMap, mutable, Map} object WeatherDataFromCassandra { @@ -11,7 +20,7 @@ object WeatherDataFromCassandra { val conf = new SparkConf(true) .setAppName("write_csv_to_cassandra") - .setMaster("local[4]") + .setMaster("local[2]") .set("spark.cassandra.input.page.row.size","10000") .set("spark.cassandra.input.split.size","1000000") .set("spark.cassandra.connection.host", "localhost") @@ -19,30 +28,39 @@ object WeatherDataFromCassandra { val sc = new SparkContext(conf) // Fetch only FRENCH weather stations - val frenchWeatherStations = sc.cassandraTable[WeatherStation](WeatherDataSchema.KEYSPACE, WeatherDataSchema.WEATHER_STATION_TABLE) + val frenchWeatherStations = sc.cassandraTable[WeatherStation](KEYSPACE, WEATHER_STATION_TABLE) .filter(_.countryCode == "FR").map(station => (station.id, station.name)).collectAsMap() + val stations: Broadcast[Map[String, String]] = sc.broadcast(frenchWeatherStations) + + val startTime = DateTime.now + // Calculate average daily temperature & pressure over all French stations, between March 2014 and June 2014 - val frenchSamples = sc.cassandraTable[MonthlySampleData](WeatherDataSchema.KEYSPACE, WeatherDataSchema.WEATHER_DATA_TABLE) - .select("weather_station","year","month","temperature","pressure") - .filter(instance => frenchWeatherStations.contains(instance.weatherStation)) //only French stations + val frenchSamples: Map[String, Sample] = sc.cassandraTable[MonthlySampleData](KEYSPACE, WEATHER_DATA_TABLE) + .select("weather_station", "year", "month", "temperature", "pressure") + .filter(instance => stations.value.contains(instance.weatherStation)) //only French stations .filter(instance => instance.month >= 3 && instance.month <= 6) // between March 2014 and June 2014 .map(data => { - val date = new StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString() - (date, Sample(data.temperature, data.pressure)) - }) - .mapValues{case sample => (sample,1)} // add counter. (date, sample) becomes (date, (sample,1)) - .reduceByKey{ case ((sample1,count1),(sample2,count2)) => (sample1+sample2,count1+count2)} // sum temperature,pressure,occurence - .mapValues{case (sample,totalCount) => sample.avg(totalCount)} // average on temperature & pressure - .sortByKey() + val date = new mutable.StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString() + (date, Sample(data.temperature, data.pressure)) + }) + .mapValues { case sample => (sample, 1)} // add counter. (date, sample) becomes (date, (sample,1)) + .reduceByKey { case ((sample1, count1), (sample2, count2)) => (sample1 + sample2, count1 + count2)} // sum temperature,pressure,occurence + .mapValues { case (sample, totalCount) => sample.avg(totalCount)} // average on temperature & pressure .collectAsMap() + + println("") println("/************************************ RESULT *******************************/") println("") - println(frenchSamples) + println((SortedMap.empty[String, Sample] ++ frenchSamples)) println("") println("/***************************************************************************/") + val endTime = DateTime.now + + println(s"Job start time : $startTime, end time : $endTime") + } case class Sample(temperature:Float,pressure:Float) { diff --git a/src/main/scala/weather/data/demo/WeatherDataIntoCassandra.scala b/src/main/scala/weather/data/demo/WeatherDataIntoCassandra.scala index 9a0eca1..23aa5d2 100644 --- a/src/main/scala/weather/data/demo/WeatherDataIntoCassandra.scala +++ b/src/main/scala/weather/data/demo/WeatherDataIntoCassandra.scala @@ -1,11 +1,13 @@ package weather.data.demo import com.datastax.spark.connector.{SomeColumns, _} +import com.github.nscala_time.time.Imports._ import org.apache.spark.{SparkConf, SparkContext} +import org.joda.time.Period object WeatherDataIntoCassandra { - val WEATHER_2014_CSV: String = "/path/to/2014.csv" + val WEATHER_2014_CSV: String = "/Users/archinnovinfo/perso/spark_data/Weather Data 2014.csv" val TABLE_COLUMNS = Seq("weather_station", "year", "month", "day", "hour", "temperature", "dewpoint", "pressure", "wind_direction", "wind_speed", "sky_condition", "sky_condition_text", "one_hour_precip", "six_hour_precip") @@ -15,7 +17,7 @@ object WeatherDataIntoCassandra { val conf = new SparkConf(true) .setAppName("write_csv_to_cassandra") - .setMaster("local[4]") + .setMaster("local[2]") .set("spark.cassandra.connection.host", "localhost") val sc = new SparkContext(conf) @@ -23,6 +25,9 @@ object WeatherDataIntoCassandra { val skyConditions = sc.cassandraTable[SkyCondition](WeatherDataSchema.KEYSPACE, WeatherDataSchema.SKY_CONDITION_TABLE).collect() .map(instance => (instance.code, instance.condition)).toMap; + val skyConditionBc = sc.broadcast(skyConditions) + + val startTime = DateTime.now // Example of raw data // 010010:99999, @@ -36,11 +41,18 @@ object WeatherDataIntoCassandra { (lines(0), lines(1).toInt, lines(2).toInt, lines(3).toInt, lines(4).toInt, lines(5).toFloat, lines(6).toFloat, lines(7).toFloat, lines(8).toFloat.toInt, lines(9).toFloat, - skyConditionCode, skyConditions.get(skyConditionCode), lines(11).toFloat, lines(12).toFloat) + skyConditionCode, skyConditionBc.value.get(skyConditionCode), lines(11).toFloat, lines(12).toFloat) }} .saveToCassandra(WeatherDataSchema.KEYSPACE, WeatherDataSchema.WEATHER_DATA_TABLE, SomeColumns(TABLE_COLUMNS:_*)) + + val endTime = DateTime.now + + val period: Period = new Period(endTime, startTime) + + println(s"Job duration (sec) : ${period.getSeconds}") } + case class SkyCondition(code:Int, condition:String) }