Skip to content

Commit

Permalink
Add Twitter streaming example
Browse files Browse the repository at this point in the history
  • Loading branch information
doanduyhai committed Feb 4, 2015
1 parent e6ee018 commit bd5bc17
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 1 deletion.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0" withSources() withJavadoc(),
"org.apache.spark" %% "spark-sql" % "1.2.0" withSources() withJavadoc(),
"org.apache.spark" %% "spark-streaming" % "1.2.0" withSources() withJavadoc(),
"org.apache.spark" %% "spark-streaming-twitter" % "1.2.0" withSources() withJavadoc(),
"com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha1" withSources() withJavadoc(),
"com.github.nscala-time" %% "nscala-time" % "1.6.0" withSources() withJavadoc()
Expand Down
3 changes: 2 additions & 1 deletion src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p %c - %m%n

# Print only messages of level WARN or above in the package com.foo.
log4j.logger.com.datastax.spark.connector.rdd.CassandraRDD=INFO
log4j.logger.org.apache.spark.storage.BlockManager=OFF
log4j.logger.org.apache.spark.storage.BlockManager=OFF
log4j.logger.twitter4j.StatusStreamImpl=OFF
24 changes: 24 additions & 0 deletions src/main/scala/twitter/stream/StreamingSchema.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package twitter.stream

import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkConf

object StreamingSchema {

val KEYSPACE = "spark_demo"
val TABLE = "twitter_stream"

def prepareSchemaAndCleanData(conf: SparkConf) {
CassandraConnector(conf).withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $KEYSPACE WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute( s"""CREATE TABLE IF NOT EXISTS $KEYSPACE.$TABLE (
topic text,
interval text,
mentions counter,
PRIMARY KEY(topic, interval)
) WITH CLUSTERING ORDER BY (interval DESC)
""")
session.execute(s"TRUNCATE $KEYSPACE.$TABLE")
}
}
}
73 changes: 73 additions & 0 deletions src/main/scala/twitter/stream/TwitterStreaming.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package twitter.stream

import com.datastax.spark.connector.streaming._
import com.datastax.spark.connector.SomeColumns
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Time, StreamingContext, Seconds}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.twitter.TwitterUtils
import org.joda.time.{DateTime, DateTimeZone}
import twitter.stream.StreamingSchema.{KEYSPACE, TABLE}
import twitter4j.Status
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

object TwitterStreaming {

/*
* Pass in Twitter properties as -D system properties:
* -Dtwitter4j.oauth.consumerKey="value"
* -Dtwitter4j.oauth.consumerSecret="value"
* -Dtwitter4j.oauth.accessToken="value"
* -Dtwitter4j.oauth.accessTokenSecret="value"
*/

val StreamingBatchInterval = 5

val Keywords = Seq("love","you","hate","cat","lol")

def main (args: Array[String]): Unit = {


val consumerKey: String = sys.env.getOrElse("TWITTER_CONSUMER_KEY", sys.props("twitter4j.oauth.consumerKey"))
val consumerSecret: String = sys.env.getOrElse("TWITTER_CONSUMER_SECRET", sys.props("twitter4j.oauth.consumerSecret"))

val accessToken: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN", sys.props("twitter4j.oauth.accessToken"))

val accessTokenSecret: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN_SECRET", sys.props("twitter4j.oauth.accessTokenSecret"))

val authorization: OAuthAuthorization = new OAuthAuthorization(new ConfigurationBuilder()
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret)
.build)

val conf = new SparkConf(true)
.setAppName("stream_to_cassandra")
.setMaster("local[4]")
.set("spark.cassandra.connection.host", "localhost")

StreamingSchema.prepareSchemaAndCleanData(conf)

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(StreamingBatchInterval))

val stream: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, Some(authorization), Nil, StorageLevel.MEMORY_ONLY_SER_2)

stream.flatMap(_.getText.toLowerCase.split("""\s+"""))
.filter(Keywords.contains(_))
.countByValueAndWindow(Seconds(5), Seconds(5))
.transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))})
.saveToCassandra(KEYSPACE, TABLE, SomeColumns("topic", "mentions", "interval"))

ssc.checkpoint("/tmp/checkpoint")
ssc.start()
ssc.awaitTermination()

}

private def now(time: Time): String =
new DateTime(time.milliseconds, DateTimeZone.UTC).toString("yyyyMMddHH:mm:ss.SSS")
}

0 comments on commit bd5bc17

Please sign in to comment.