-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathget_citadel_data.py
66 lines (52 loc) · 1.93 KB
/
get_citadel_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Python3
import requests
import datetime
import json
import time
import os
# Helper Methods
def next_week(current_week):
return current_week + datetime.timedelta(days=7)
def prev_week(current_week):
return current_week - datetime.timedelta(days=7)
def get_timestamps():
cur = datetime.datetime.now()
while True:
yield (int(cur.timestamp()), int(next_week(cur).timestamp()))
cur = prev_week(cur)
# Download and cache all timeseries from the Citadel API
def get_timeseries_from_api():
data_dir = 'data/'
# First get information about all points in Citadel
r = requests.get('https://citadel.ucsd.edu/api/point/')
uuids = sorted([point['uuid'] for point in r.json()['point_list']])
# Get time series data for every Citadel point
for i, uuid in enumerate(uuids):
print('\n\n', flush=True)
print(str(i) + '\t' + uuid, flush=True)
timestamps = get_timestamps()
url = 'https://citadel.ucsd.edu/api/point/' + uuid + '/timeseries'
sensor_data = {'data': {}}
num_empty = 0
for start_time, end_time in timestamps:
time.sleep(1)
r = requests.get(url, params={'start_time': start_time, 'end_time': end_time})
sensor_data['data'].update(r.json()['data'])
print('\t' + str(len(r.json()['data'])), flush=True)
if len(r.json()['data']) == 0:
num_empty += 1
elif len(r.json()['data']) == 10000:
print('Found 10000 limit', flush=True)
return
else:
num_empty = 0
if num_empty >= 20:
break
f = open(data_dir + uuid + '.json', 'w')
json.dump(sensor_data, f)
f.close()
if __name__ == '__main__':
# Create data directory if it doesn't already exist
if not os.path.exists('data/'):
os.makedirs('data/')
get_timeseries_from_api()