diff --git a/build.sbt b/build.sbt index 2e783d0..876a3b4 100644 --- a/build.sbt +++ b/build.sbt @@ -9,6 +9,7 @@ scalaVersion := "2.10.4" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0" withSources() withJavadoc(), "org.apache.spark" %% "spark-sql" % "1.2.0" withSources() withJavadoc(), + "org.apache.spark" %% "spark-streaming" % "1.2.0" withSources() withJavadoc(), "org.apache.spark" %% "spark-streaming-twitter" % "1.2.0" withSources() withJavadoc(), "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha1" withSources() withJavadoc(), "com.github.nscala-time" %% "nscala-time" % "1.6.0" withSources() withJavadoc() diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index d6e68d9..bf48a21 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -7,4 +7,5 @@ log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p %c - %m%n # Print only messages of level WARN or above in the package com.foo. log4j.logger.com.datastax.spark.connector.rdd.CassandraRDD=INFO -log4j.logger.org.apache.spark.storage.BlockManager=OFF \ No newline at end of file +log4j.logger.org.apache.spark.storage.BlockManager=OFF +log4j.logger.twitter4j.StatusStreamImpl=OFF \ No newline at end of file diff --git a/src/main/scala/twitter/stream/StreamingSchema.scala b/src/main/scala/twitter/stream/StreamingSchema.scala new file mode 100644 index 0000000..9f79182 --- /dev/null +++ b/src/main/scala/twitter/stream/StreamingSchema.scala @@ -0,0 +1,24 @@ +package twitter.stream + +import com.datastax.spark.connector.cql.CassandraConnector +import org.apache.spark.SparkConf + +object StreamingSchema { + + val KEYSPACE = "spark_demo" + val TABLE = "twitter_stream" + + def prepareSchemaAndCleanData(conf: SparkConf) { + CassandraConnector(conf).withSessionDo { session => + session.execute(s"CREATE KEYSPACE IF NOT EXISTS $KEYSPACE WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") + session.execute( s"""CREATE TABLE IF NOT EXISTS $KEYSPACE.$TABLE ( + topic text, + interval text, + mentions counter, + PRIMARY KEY(topic, interval) + ) WITH CLUSTERING ORDER BY (interval DESC) + """) + session.execute(s"TRUNCATE $KEYSPACE.$TABLE") + } + } +} diff --git a/src/main/scala/twitter/stream/TwitterStreaming.scala b/src/main/scala/twitter/stream/TwitterStreaming.scala new file mode 100644 index 0000000..ba429c0 --- /dev/null +++ b/src/main/scala/twitter/stream/TwitterStreaming.scala @@ -0,0 +1,73 @@ +package twitter.stream + +import com.datastax.spark.connector.streaming._ +import com.datastax.spark.connector.SomeColumns +import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} +import org.apache.spark.streaming.{Time, StreamingContext, Seconds} +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.twitter.TwitterUtils +import org.joda.time.{DateTime, DateTimeZone} +import twitter.stream.StreamingSchema.{KEYSPACE, TABLE} +import twitter4j.Status +import twitter4j.auth.OAuthAuthorization +import twitter4j.conf.ConfigurationBuilder + +object TwitterStreaming { + + /* + * Pass in Twitter properties as -D system properties: + * -Dtwitter4j.oauth.consumerKey="value" + * -Dtwitter4j.oauth.consumerSecret="value" + * -Dtwitter4j.oauth.accessToken="value" + * -Dtwitter4j.oauth.accessTokenSecret="value" + */ + + val StreamingBatchInterval = 5 + + val Keywords = Seq("love","you","hate","cat","lol") + + def main (args: Array[String]): Unit = { + + + val consumerKey: String = sys.env.getOrElse("TWITTER_CONSUMER_KEY", sys.props("twitter4j.oauth.consumerKey")) + val consumerSecret: String = sys.env.getOrElse("TWITTER_CONSUMER_SECRET", sys.props("twitter4j.oauth.consumerSecret")) + + val accessToken: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN", sys.props("twitter4j.oauth.accessToken")) + + val accessTokenSecret: String = sys.env.getOrElse("TWITTER_ACCESS_TOKEN_SECRET", sys.props("twitter4j.oauth.accessTokenSecret")) + + val authorization: OAuthAuthorization = new OAuthAuthorization(new ConfigurationBuilder() + .setOAuthConsumerKey(consumerKey) + .setOAuthConsumerSecret(consumerSecret) + .setOAuthAccessToken(accessToken) + .setOAuthAccessTokenSecret(accessTokenSecret) + .build) + + val conf = new SparkConf(true) + .setAppName("stream_to_cassandra") + .setMaster("local[4]") + .set("spark.cassandra.connection.host", "localhost") + + StreamingSchema.prepareSchemaAndCleanData(conf) + + val sc = new SparkContext(conf) + val ssc = new StreamingContext(sc, Seconds(StreamingBatchInterval)) + + val stream: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, Some(authorization), Nil, StorageLevel.MEMORY_ONLY_SER_2) + + stream.flatMap(_.getText.toLowerCase.split("""\s+""")) + .filter(Keywords.contains(_)) + .countByValueAndWindow(Seconds(5), Seconds(5)) + .transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))}) + .saveToCassandra(KEYSPACE, TABLE, SomeColumns("topic", "mentions", "interval")) + + ssc.checkpoint("/tmp/checkpoint") + ssc.start() + ssc.awaitTermination() + + } + + private def now(time: Time): String = + new DateTime(time.milliseconds, DateTimeZone.UTC).toString("yyyyMMddHH:mm:ss.SSS") +}