forked from texas/texplorer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
113 lines (93 loc) · 2.78 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import csv
import os
import re
import utm
from elasticsearch import Elasticsearch, NotFoundError
from elasticsearch.helpers import bulk
from lib.classificator import Classifier
INDEX = 'thc'
DOC_TYPE = 'marker'
def find_years(text):
try:
matches = re.findall(r"\d{4}", text)
years = [int(m) for m in matches]
return list(set(years))
except TypeError:
return []
def get_data(path='data/Historical Marker_20150521_145030_254.csv'):
with open(path, 'r') as fp:
reader = csv.DictReader(fp)
for row in reader:
text = row['markertext']
years = find_years(text)
row['address'] = row['address'].strip()
# add our own data
row['years'] = years
row['classifications'] = Classifier(years, text=text).classify()
try:
lat, lon = utm.to_latlon(
int(row['utm_east']),
int(row['utm_north']),
int(row['utm_zone']),
northern=True,
)
row['location'] = {
"lat": lat,
"lon": lon,
}
except ValueError:
# log warn missing
pass
yield row
def set_mapping():
"""
http://localhost:9200/thc/marker/_mapping?pretty
"""
host = os.environ.get('ELASTICSEARCH_HOST', 'localhost')
connection = Elasticsearch([host])
connection.indices.delete(index=[INDEX], ignore=[404])
connection.indices.create(index=INDEX, body={
'mappings': {
DOC_TYPE: {
'properties': {
'location': {
'type': 'geo_point'
}
}
}
}
})
def to_doc(row):
"""Format a row as a document for the bulk api."""
return {
'_index': INDEX,
'_type': DOC_TYPE,
'_id': row['atlas_number'],
'doc': row,
}
def get_bulk_ready_data():
"""Return an iterator to use with the bulk helper."""
for row in get_data():
yield to_doc(row)
def push():
host = os.environ.get('ELASTICSEARCH_HOST', 'localhost')
connection = Elasticsearch([host])
# Delete old markers or do initial setup
try:
print(connection.delete_by_query(index=[INDEX], doc_type=DOC_TYPE, q='*'))
except NotFoundError:
set_mapping()
if True:
# real 0m9.839s
bulk(connection, get_bulk_ready_data())
else:
# real 0m30.341s
for row in get_data():
connection.create(
index=INDEX,
doc_type=DOC_TYPE,
body=row,
id=row['atlas_number'],
)
if __name__ == '__main__':
push()