From 7d8fafd16defff7b3cc0ebdd3b7a92b29e3af437 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Wed, 18 Mar 2015 14:56:40 +0100 Subject: [PATCH] Improve the Twitter streaming example --- .../scala/twitter/stream/StreamingSchema.scala | 6 +++--- .../scala/twitter/stream/TwitterStreaming.scala | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/main/scala/twitter/stream/StreamingSchema.scala b/src/main/scala/twitter/stream/StreamingSchema.scala index 9f79182..440cda5 100644 --- a/src/main/scala/twitter/stream/StreamingSchema.scala +++ b/src/main/scala/twitter/stream/StreamingSchema.scala @@ -12,10 +12,10 @@ object StreamingSchema { CassandraConnector(conf).withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $KEYSPACE WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute( s"""CREATE TABLE IF NOT EXISTS $KEYSPACE.$TABLE ( - topic text, + keyword text, interval text, - mentions counter, - PRIMARY KEY(topic, interval) + count counter, + PRIMARY KEY(keyword, interval) ) WITH CLUSTERING ORDER BY (interval DESC) """) session.execute(s"TRUNCATE $KEYSPACE.$TABLE") diff --git a/src/main/scala/twitter/stream/TwitterStreaming.scala b/src/main/scala/twitter/stream/TwitterStreaming.scala index aa2cdd7..069cb2b 100644 --- a/src/main/scala/twitter/stream/TwitterStreaming.scala +++ b/src/main/scala/twitter/stream/TwitterStreaming.scala @@ -25,7 +25,7 @@ object TwitterStreaming { val StreamingBatchInterval = 5 - val Keywords = Seq("love","hate","lol","cat") + val Keywords = Seq("love","hate",":-)",":)",":-(",":(") def main (args: Array[String]): Unit = { @@ -47,8 +47,8 @@ object TwitterStreaming { stream.flatMap(_.getText.toLowerCase.split("""\s+""")) .filter(Keywords.contains(_)) .countByValueAndWindow(batchDuration, batchDuration) - .transform((rdd, time) => rdd.map { case (keyword, count) => (keyword, count, now(time))}) - .saveToCassandra(KEYSPACE, TABLE, SomeColumns("topic", "mentions", "interval")) + .transform((rdd, time) => rdd.map { case (keyword, count) => (replaceSmiley(keyword), count, now(time))}) + .saveToCassandra(KEYSPACE, TABLE, SomeColumns("keyword", "count", "interval")) ssc.checkpoint("/tmp/checkpoint") ssc.start() @@ -56,6 +56,17 @@ object TwitterStreaming { } + private def replaceSmiley(possibleSmiley: String): String = { + possibleSmiley match { + case ":)" => "joy" + case ":-)" => "joy" + case ":(" => "sadness" + case ":-(" => "sadness" + case whatever => whatever + } + } + + private def now(time: Time): String = new DateTime(time.milliseconds, DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss")