-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvisualization.py
91 lines (81 loc) · 2.95 KB
/
visualization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from elasticsearch import Elasticsearch
from datetime import datetime as dt
class Visualizer(object):
INDEX_NAME = "twitter"
TYPE_NAME = "tweets"
ID_FILE = 0
ES_HOST = {"host": "localhost", "port": 9200}
header = ['geo', 'content', 'sentiment', 'time']
es = Elasticsearch(hosts=[ES_HOST], http_auth="elastic:elastic")
@staticmethod
def init():
'''
if index already exits, then delete it. And recreate the index with correct mapping
:return:
'''
if Visualizer.es.indices.exists(Visualizer.INDEX_NAME):
print("deleting '%s' index..." % Visualizer.INDEX_NAME)
res = Visualizer.es.indices.delete(index=Visualizer.INDEX_NAME)
print(" response: '%s'" % res)
request_body = {
"settings": {
"number_of_shards": 2,
"number_of_replicas": 2
},
"mappings": {
"tweets": {
"properties": {
"time": {
"type": "date",
"format": "MM/dd/yyyy HH:mm:ss"
},
"geo": {
"properties": {
"coordinates": {
"type": "geo_point"
}
}
}
}
}
}
}
print("creating '%s' index..." % Visualizer.INDEX_NAME)
res = Visualizer.es.indices.create(index=Visualizer.INDEX_NAME, body=request_body)
print(" response for create: '%s'" % res)
@staticmethod
def upload(source):
'''
reformat data and
upload data in json object
:param source:
:return:
'''
bulk_data = []
for tup in source:
data_dict = {}
location = tup[0].split(",")
data_dict[Visualizer.header[0]] = {"coordinates": {"lat": float(location[1]), "lon": float(location[0])}}
for i in range(1, len(tup)):
data_dict[Visualizer.header[i]] = tup[i]
data_dict[Visualizer.header[-1]] = dt.now().strftime("%m/%d/%Y %H:%M:%S")
op_dict = {
"index": {
"_index": Visualizer.INDEX_NAME,
"_type": Visualizer.TYPE_NAME,
"_id": Visualizer.ID_FILE
}
}
Visualizer.ID_FILE += 1
bulk_data.append(op_dict)
bulk_data.append(data_dict)
res = Visualizer.es.bulk(index=Visualizer.INDEX_NAME, body=bulk_data, refresh=True)
print(" response for bulk: '%s'" % res)
if __name__ == '__main__':
Visualizer.init()
source = [
['beijing', 'Hello World', 'good'],
['shanghai', 'roasted DUck', 'bad'],
['shenzhen', 'petrabbit', 'netural']
]
Visualizer.upload(source)