From 5ebc662adda33f21e46b52f5448ab13ff22e8fd0 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 4 Feb 2015 19:04:27 +0100 Subject: [PATCH] Code cleanup --- .../twitter/stream/TwitterStreaming.scala | 56 +++++++++---------- .../demo/FromCSVCaseClassToCassandra.scala | 3 +- .../demo/FromCSVToCassandra.scala | 4 +- .../demo/FromCassandraToCaseClass.scala | 1 - .../demo/FromCassandraToSQL.scala | 2 +- .../data/demo/WeatherDataFromCassandra.scala | 14 ++--- 6 files changed, 38 insertions(+), 42 deletions(-) diff --git a/src/main/scala/twitter/stream/TwitterStreaming.scala b/src/main/scala/twitter/stream/TwitterStreaming.scala index ba429c0..0a5de37 100644 --- a/src/main/scala/twitter/stream/TwitterStreaming.scala +++ b/src/main/scala/twitter/stream/TwitterStreaming.scala @@ -2,8 +2,8 @@ package twitter.stream import com.datastax.spark.connector.streaming._ import com.datastax.spark.connector.SomeColumns -import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} -import org.apache.spark.streaming.{Time, StreamingContext, Seconds} +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.{Duration, Time, StreamingContext, Seconds} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.twitter.TwitterUtils @@ -29,45 +29,45 @@ object TwitterStreaming { def main (args: Array[String]): Unit = { - val consumerKey: String = sys.env.getOrElse("TWITTER_CONSUMER_KEY", sys.props("twitter4j.oauth.consumerKey")) val consumerSecret: String = sys.env.getOrElse("TWITTER_CONSUMER_SECRET", sys.props("twitter4j.oauth.consumerSecret")) - val accessToken: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN", sys.props("twitter4j.oauth.accessToken")) + val accessToken: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN", sys.props("twitter4j.oauth.accessToken")) - val accessTokenSecret: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN_SECRET", sys.props("twitter4j.oauth.accessTokenSecret")) + val accessTokenSecret: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN_SECRET", sys.props("twitter4j.oauth.accessTokenSecret")) - val authorization: OAuthAuthorization = new OAuthAuthorization(new ConfigurationBuilder() - .setOAuthConsumerKey(consumerKey) - .setOAuthConsumerSecret(consumerSecret) - .setOAuthAccessToken(accessToken) - .setOAuthAccessTokenSecret(accessTokenSecret) - .build) + val authorization: OAuthAuthorization = new OAuthAuthorization(new ConfigurationBuilder() + .setOAuthConsumerKey(consumerKey) + .setOAuthConsumerSecret(consumerSecret) + .setOAuthAccessToken(accessToken) + .setOAuthAccessTokenSecret(accessTokenSecret) + .build) - val conf = new SparkConf(true) - .setAppName("stream_to_cassandra") - .setMaster("local[4]") - .set("spark.cassandra.connection.host", "localhost") + val conf = new SparkConf(true) + .setAppName("stream_to_cassandra") + .setMaster("local[4]") + .set("spark.cassandra.connection.host", "localhost") - StreamingSchema.prepareSchemaAndCleanData(conf) + StreamingSchema.prepareSchemaAndCleanData(conf) - val sc = new SparkContext(conf) - val ssc = new StreamingContext(sc, Seconds(StreamingBatchInterval)) + val sc = new SparkContext(conf) + val batchDuration: Duration = Seconds(StreamingBatchInterval) + val ssc = new StreamingContext(sc, batchDuration) - val stream: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, Some(authorization), Nil, StorageLevel.MEMORY_ONLY_SER_2) + val stream: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, Some(authorization), Nil, StorageLevel.MEMORY_ONLY_SER_2) - stream.flatMap(_.getText.toLowerCase.split("""\s+""")) - .filter(Keywords.contains(_)) - .countByValueAndWindow(Seconds(5), Seconds(5)) - .transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))}) - .saveToCassandra(KEYSPACE, TABLE, SomeColumns("topic", "mentions", "interval")) + stream.flatMap(_.getText.toLowerCase.split("""\s+""")) + .filter(Keywords.contains(_)) + .countByValueAndWindow(batchDuration, batchDuration) + .transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))}) + .saveToCassandra(KEYSPACE, TABLE, SomeColumns("topic", "mentions", "interval")) - ssc.checkpoint("/tmp/checkpoint") - ssc.start() - ssc.awaitTermination() + ssc.checkpoint("/tmp/checkpoint") + ssc.start() + ssc.awaitTermination() } private def now(time: Time): String = - new DateTime(time.milliseconds, DateTimeZone.UTC).toString("yyyyMMddHH:mm:ss.SSS") + new DateTime(time.milliseconds, DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss") } diff --git a/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala b/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala index 7d71581..2bad6eb 100644 --- a/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala +++ b/src/main/scala/us/unemployment/demo/FromCSVCaseClassToCassandra.scala @@ -3,6 +3,7 @@ package us.unemployment.demo import com.datastax.spark.connector._ import org.apache.spark.{SparkConf, SparkContext} +import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE} object FromCSVCaseClassToCassandra { @@ -43,7 +44,7 @@ object FromCSVCaseClassToCassandra { lines(4).toInt, lines(5).toDouble, lines(6).toInt, lines(7).toInt, lines(8).toInt, lines(9).toDouble, lines(10).toInt) }} - .saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE) + .saveToCassandra(KEYSPACE, TABLE) sc.stop() } diff --git a/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala b/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala index eb6dd3e..9f49153 100644 --- a/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala +++ b/src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala @@ -2,8 +2,8 @@ package us.unemployment.demo import com.datastax.spark.connector.{SomeColumns, _} -import com.datastax.spark.connector.SomeColumns import org.apache.spark.{SparkConf, SparkContext} +import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE} object FromCSVToCassandra { @@ -64,7 +64,7 @@ object FromCSVToCassandra { lines(4).toInt, lines(5).toDouble, lines(6).toInt, lines(7).toInt, lines(8).toInt, lines(9).toDouble, lines(10).toInt) }} - .saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE, TABLE_COLUMNS) + .saveToCassandra(KEYSPACE, TABLE, TABLE_COLUMNS) sc.stop() } diff --git a/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala b/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala index eb68eb1..ba38031 100644 --- a/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala +++ b/src/main/scala/us/unemployment/demo/FromCassandraToCaseClass.scala @@ -1,7 +1,6 @@ package us.unemployment.demo import com.datastax.spark.connector._ -import com.datastax.spark.connector.rdd.CassandraRDD import org.apache.spark.{SparkConf, SparkContext} import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE} diff --git a/src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala b/src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala index 67955f8..3efa762 100644 --- a/src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala +++ b/src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala @@ -10,7 +10,7 @@ object FromCassandraToSQL { def main (args: Array[String]) { val conf = new SparkConf(true) - .setAppName("read_csv_from_cassandra_into_case_class") + .setAppName("read_data_from_cassandra_using_SQL") .setMaster("local") .set("spark.cassandra.connection.host", "localhost") diff --git a/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala b/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala index 2e97d8d..10dad82 100644 --- a/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala +++ b/src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala @@ -1,16 +1,12 @@ package weather.data.demo -import java.util.Date - import com.datastax.spark.connector._ import org.apache.spark.SparkContext._ import com.github.nscala_time.time.Imports._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.{SparkConf, SparkContext} -import org.joda.time.Seconds import weather.data.demo.WeatherDataSchema.{WEATHER_DATA_TABLE, WEATHER_STATION_TABLE, KEYSPACE} -import scala.collection.immutable.TreeMap import scala.collection.{SortedMap, mutable, Map} object WeatherDataFromCassandra { @@ -31,19 +27,19 @@ object WeatherDataFromCassandra { val frenchWeatherStations = sc.cassandraTable[WeatherStation](KEYSPACE, WEATHER_STATION_TABLE) .filter(_.countryCode == "FR").map(station => (station.id, station.name)).collectAsMap() - val stations: Broadcast[Map[String, String]] = sc.broadcast(frenchWeatherStations) + val frenchStationsMap: Broadcast[Map[String, String]] = sc.broadcast(frenchWeatherStations) val startTime = DateTime.now // Calculate average daily temperature & pressure over all French stations, between March 2014 and June 2014 val frenchSamples: Map[String, Sample] = sc.cassandraTable[MonthlySampleData](KEYSPACE, WEATHER_DATA_TABLE) .select("weather_station", "year", "month", "temperature", "pressure") - .filter(instance => stations.value.contains(instance.weatherStation)) //only French stations + .filter(instance => frenchStationsMap.value.contains(instance.weatherStation)) //only French stations .filter(instance => instance.month >= 3 && instance.month <= 6) // between March 2014 and June 2014 .map(data => { - val date = new mutable.StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString() - (date, Sample(data.temperature, data.pressure)) - }) + val date = new mutable.StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString() + (date, Sample(data.temperature, data.pressure)) + }) .mapValues { case sample => (sample, 1)} // add counter. (date, sample) becomes (date, (sample,1)) .reduceByKey { case ((sample1, count1), (sample2, count2)) => (sample1 + sample2, count1 + count2)} // sum temperature,pressure,occurence .mapValues { case (sample, totalCount) => sample.avg(totalCount)} // average on temperature & pressure