-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtower.py
executable file
·55 lines (49 loc) · 2.02 KB
/
tower.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python3
import serial, json, threading, time, os
from housepy import config, log, util
SOURCE = "tower"
class WeatherStation(threading.Thread):
def __init__(self, data_sender=None):
threading.Thread.__init__(self)
self.daemon = True
self.data_sender = data_sender
self.start()
def run(self):
try:
device_name = None
for dn in os.listdir("/dev"):
if "tty.usbmodem" in dn:
device_name = os.path.join("/dev", dn)
break
if "ttyACM0" in dn:
device_name = os.path.join("/dev", dn)
break
if device_name is None:
log.info("No devices available")
exit()
connection = serial.Serial(device_name, 9600)
log.info("Receiving xbee messages on %s" % device_name)
except Exception as e:
log.error(log.exc(e))
else:
while True:
result = None
try:
result = connection.readline().decode('utf-8').strip()
data = json.loads(result)
data.update({'source': SOURCE})
log.info(json.dumps(data, indent=4))
if self.data_sender is not None:
self.data_sender.queue.put(data)
# make another entry for GPS
data = {key: value for (key, value) in data.items() if key in ['latitude', 'longitude', 'altitude_m', 'satellites']}
data.update({'source': "gps"})
if self.data_sender is not None:
self.data_sender.queue.put(data)
except Exception as e:
log.error(log.exc(e))
log.info(result)
if __name__ == "__main__":
weather_station = WeatherStation()
while True:
time.sleep(0.1)