Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
doanduyhai committed Mar 15, 2015
1 parent 85f39f2 commit 5ebc662
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 42 deletions.
56 changes: 28 additions & 28 deletions src/main/scala/twitter/stream/TwitterStreaming.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package twitter.stream

import com.datastax.spark.connector.streaming._
import com.datastax.spark.connector.SomeColumns
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Time, StreamingContext, Seconds}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Duration, Time, StreamingContext, Seconds}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.twitter.TwitterUtils
Expand All @@ -29,45 +29,45 @@ object TwitterStreaming {

def main (args: Array[String]): Unit = {


val consumerKey: String = sys.env.getOrElse("TWITTER_CONSUMER_KEY", sys.props("twitter4j.oauth.consumerKey"))
val consumerSecret: String = sys.env.getOrElse("TWITTER_CONSUMER_SECRET", sys.props("twitter4j.oauth.consumerSecret"))

val accessToken: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN", sys.props("twitter4j.oauth.accessToken"))
val accessToken: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN", sys.props("twitter4j.oauth.accessToken"))

val accessTokenSecret: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN_SECRET", sys.props("twitter4j.oauth.accessTokenSecret"))
val accessTokenSecret: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN_SECRET", sys.props("twitter4j.oauth.accessTokenSecret"))

val authorization: OAuthAuthorization = new OAuthAuthorization(new ConfigurationBuilder()
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret)
.build)
val authorization: OAuthAuthorization = new OAuthAuthorization(new ConfigurationBuilder()
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret)
.build)

val conf = new SparkConf(true)
.setAppName("stream_to_cassandra")
.setMaster("local[4]")
.set("spark.cassandra.connection.host", "localhost")
val conf = new SparkConf(true)
.setAppName("stream_to_cassandra")
.setMaster("local[4]")
.set("spark.cassandra.connection.host", "localhost")

StreamingSchema.prepareSchemaAndCleanData(conf)
StreamingSchema.prepareSchemaAndCleanData(conf)

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(StreamingBatchInterval))
val sc = new SparkContext(conf)
val batchDuration: Duration = Seconds(StreamingBatchInterval)
val ssc = new StreamingContext(sc, batchDuration)

val stream: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, Some(authorization), Nil, StorageLevel.MEMORY_ONLY_SER_2)
val stream: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, Some(authorization), Nil, StorageLevel.MEMORY_ONLY_SER_2)

stream.flatMap(_.getText.toLowerCase.split("""\s+"""))
.filter(Keywords.contains(_))
.countByValueAndWindow(Seconds(5), Seconds(5))
.transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))})
.saveToCassandra(KEYSPACE, TABLE, SomeColumns("topic", "mentions", "interval"))
stream.flatMap(_.getText.toLowerCase.split("""\s+"""))
.filter(Keywords.contains(_))
.countByValueAndWindow(batchDuration, batchDuration)
.transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))})
.saveToCassandra(KEYSPACE, TABLE, SomeColumns("topic", "mentions", "interval"))

ssc.checkpoint("/tmp/checkpoint")
ssc.start()
ssc.awaitTermination()
ssc.checkpoint("/tmp/checkpoint")
ssc.start()
ssc.awaitTermination()

}

private def now(time: Time): String =
new DateTime(time.milliseconds, DateTimeZone.UTC).toString("yyyyMMddHH:mm:ss.SSS")
new DateTime(time.milliseconds, DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package us.unemployment.demo

import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE}

object FromCSVCaseClassToCassandra {

Expand Down Expand Up @@ -43,7 +44,7 @@ object FromCSVCaseClassToCassandra {
lines(4).toInt, lines(5).toDouble, lines(6).toInt, lines(7).toInt,
lines(8).toInt, lines(9).toDouble, lines(10).toInt)
}}
.saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE)
.saveToCassandra(KEYSPACE, TABLE)

sc.stop()
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package us.unemployment.demo


import com.datastax.spark.connector.{SomeColumns, _}
import com.datastax.spark.connector.SomeColumns
import org.apache.spark.{SparkConf, SparkContext}
import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE}

object FromCSVToCassandra {

Expand Down Expand Up @@ -64,7 +64,7 @@ object FromCSVToCassandra {
lines(4).toInt, lines(5).toDouble, lines(6).toInt, lines(7).toInt,
lines(8).toInt, lines(9).toDouble, lines(10).toInt)
}}
.saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE, TABLE_COLUMNS)
.saveToCassandra(KEYSPACE, TABLE, TABLE_COLUMNS)

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package us.unemployment.demo

import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.CassandraRDD
import org.apache.spark.{SparkConf, SparkContext}
import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object FromCassandraToSQL {
def main (args: Array[String]) {

val conf = new SparkConf(true)
.setAppName("read_csv_from_cassandra_into_case_class")
.setAppName("read_data_from_cassandra_using_SQL")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

Expand Down
14 changes: 5 additions & 9 deletions src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package weather.data.demo

import java.util.Date

import com.datastax.spark.connector._
import org.apache.spark.SparkContext._
import com.github.nscala_time.time.Imports._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.Seconds
import weather.data.demo.WeatherDataSchema.{WEATHER_DATA_TABLE, WEATHER_STATION_TABLE, KEYSPACE}

import scala.collection.immutable.TreeMap
import scala.collection.{SortedMap, mutable, Map}

object WeatherDataFromCassandra {
Expand All @@ -31,19 +27,19 @@ object WeatherDataFromCassandra {
val frenchWeatherStations = sc.cassandraTable[WeatherStation](KEYSPACE, WEATHER_STATION_TABLE)
.filter(_.countryCode == "FR").map(station => (station.id, station.name)).collectAsMap()

val stations: Broadcast[Map[String, String]] = sc.broadcast(frenchWeatherStations)
val frenchStationsMap: Broadcast[Map[String, String]] = sc.broadcast(frenchWeatherStations)

val startTime = DateTime.now

// Calculate average daily temperature & pressure over all French stations, between March 2014 and June 2014
val frenchSamples: Map[String, Sample] = sc.cassandraTable[MonthlySampleData](KEYSPACE, WEATHER_DATA_TABLE)
.select("weather_station", "year", "month", "temperature", "pressure")
.filter(instance => stations.value.contains(instance.weatherStation)) //only French stations
.filter(instance => frenchStationsMap.value.contains(instance.weatherStation)) //only French stations
.filter(instance => instance.month >= 3 && instance.month <= 6) // between March 2014 and June 2014
.map(data => {
val date = new mutable.StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString()
(date, Sample(data.temperature, data.pressure))
})
val date = new mutable.StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString()
(date, Sample(data.temperature, data.pressure))
})
.mapValues { case sample => (sample, 1)} // add counter. (date, sample) becomes (date, (sample,1))
.reduceByKey { case ((sample1, count1), (sample2, count2)) => (sample1 + sample2, count1 + count2)} // sum temperature,pressure,occurence
.mapValues { case (sample, totalCount) => sample.avg(totalCount)} // average on temperature & pressure
Expand Down

0 comments on commit 5ebc662

Please sign in to comment.