-
Notifications
You must be signed in to change notification settings - Fork 0
/
More Efficient IO Version
115 lines (89 loc) · 4.47 KB
/
More Efficient IO Version
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import kafka.serializer.StringDecoder;
import kafka.utils.Json;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.json.JSONArray;
import org.json.JSONObject;
import scala.Array;
import scala.Tuple2;
import org.apache.spark.streaming.kafka010.*;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
public class TweetAnalyzer {
private static final DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
private static String dataSet = "";
private static void setString(String s){
dataSet += s;
}
private static String getString(){
return dataSet;
}
private static void writeCSV() throws IOException {
FileWriter writer = new FileWriter("/Users/kaikin/test.csv", true);
Date date = new Date();
writer.append(dataSet);
writer.flush();
writer.close();
dataSet ="";
}
public static void main(String[] args) throws InterruptedException, IOException {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("tweet");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group11");
Collection<String> topics = Arrays.asList("twitter_alpha");
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
// kafkaStream.flatMap(record ->
// final JSONObject obj = new JSONObject("{" + record.key() + ":[" +record.value() +"]}");
// final JSONArray wholeTweet = obj.getJSONArray(record.key().substring(1, record.key().length()-1));
// return
// )
kafkaStream.foreachRDD(rdd -> {
rdd.foreach(record -> {
final JSONObject obj = new JSONObject("{" + record.key() + ":[" +record.value() +"]}");
final JSONArray wholeTweet = obj.getJSONArray(record.key().substring(1, record.key().length()-1));
// System.out.println( "{" + record.key() + ":[" +record.value() +"]}");
// System.out.println("This is a List" + record.key() + "End of list");
// System.out.println("This is a List" + record.value() + "End of list");
// System.out.println(wholeTweet.length());
String text = wholeTweet.getJSONObject(0).getString("text");
// System.out.println(System.currentTimeMillis() + text);
// System.out.println("List(1)" + wholeTweet.getJSONObject(3) + "-------");
setString(text + "\n");
System.out.println(getString());
// FileWriter writer = new FileWriter("/Users/kaikin/test.csv", true);
// Date date = new Date();
// writer.append(sdf.format(date) + "," + "\"" + text + "\"");
// writer.flush();
// writer.close();
});
writeCSV();
});
jssc.start();
jssc.awaitTermination();
}
}