-
Notifications
You must be signed in to change notification settings - Fork 14
/
read_dht.py
135 lines (114 loc) · 4.89 KB
/
read_dht.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
# This file is part of HoneyPi [honey-pi.de] which is released under Creative Commons License Attribution-NonCommercial-ShareAlike 3.0 Unported (CC BY-NC-SA 3.0).
# See file LICENSE or go to http://creativecommons.org/licenses/by-nc-sa/3.0/ for full license details.
# This script requires:
# sudo pip3 install adafruit-circuitpython-dht
# sudo apt-get install libgpiod2
from utilities import blockPrinting
import psutil # for process killing (sudo apt-get install python3-psutil)
import logging
logger = logging.getLogger('HoneyPi.read_dht')
import time
try:
import adafruit_dht
except ImportError as ex:
logger.error("read_dht.py ImportError while importing adafruit_dht " + str(ex))
try:
import digitalio
except ImportError as ex:
logger.error("ImportError while importing digitalio " + str(ex))
@blockPrinting
def measure_dht(ts_sensor):
fields = {}
timer = 1
max_timer = 8
errorMessage = ""
dht_type = 22
pin = 0
temperature = None
humidity = None
dht = None
if 'dht_type' in ts_sensor and ts_sensor["dht_type"] is not None:
dht_type = int(ts_sensor["dht_type"])
else:
logger.warning("DHT type not defined, using DHT22 by default.")
dht_type = 22
if 'pin' in ts_sensor and ts_sensor["pin"] is not None:
pin = int(ts_sensor["pin"])
else:
logger.error("DHT GPIO-Pin not defined!")
return fields
try:
SENSOR_PIN = digitalio.Pin(pin)
except NameError as ex:
logger.error("NameError reading DHT " + str(ex))
return fields
except Exception as ex:
errorMessage = "Setting up DHT PIN failed for GPIO: " + str(pin)
logger.error(errorMessage)
return fields
try:
for proc in psutil.process_iter():
proc_name = proc.name()
if proc_name == 'libgpiod_pulsein' or proc_name == 'libgpiod_pulsei':
proc.kill()
logger.debug("Killed process " + proc_name)
except:
logger.error("Exception occured while terminating libgpiod_pulsein. Likely the process was already killed.")
while timer <= max_timer:
try:
# setup sensor
if dht_type == 2302:
dht = adafruit_dht.DHT22(SENSOR_PIN, use_pulseio=True)
elif dht_type == 11:
dht = adafruit_dht.DHT11(SENSOR_PIN, use_pulseio=True)
else:
dht = adafruit_dht.DHT22(SENSOR_PIN, use_pulseio=True)
temperature = dht.temperature
humidity = dht.humidity
dht.exit()
break # break while if no Exception occured
except RuntimeError as error:
# Errors happen fairly often, DHT's are hard to read, just keep going
logger.debug("Failed reading DHT ("+str(timer)+"/"+str(max_timer)+"): " + error.args[0])
time.sleep(1)
timer = timer + 1
pass
except:
# Errors happen fairly often, DHT's are hard to read, just keep going
logger.debug("Failed reading DHT ("+str(timer)+"/"+str(max_timer)+"): Unhandled Exception")
time.sleep(1)
timer = timer + 1
try:
if dht is not None:
dht.exit()
else:
logger.debug("dht is None which means an exception broke the dht initialising.")
except:
logger.exception("Unhandled Exception in dht.exit()")
pass
if timer >= max_timer: # end reached
logger.error("Failed reading DHT ("+str(timer-1)+" times) on GPIO " + str(pin))
return fields
# Create returned dict if ts-field is defined
if 'ts_field_temperature' in ts_sensor and temperature is not None:
if 'offset' in ts_sensor and ts_sensor["offset"] is not None:
temperature = temperature-float(ts_sensor["offset"])
fields[ts_sensor["ts_field_temperature"]] = round(temperature, 1)
if 'ts_field_humidity' in ts_sensor and humidity is not None:
fields[ts_sensor["ts_field_humidity"]] = round(humidity, 1)
return fields
# For testing you can call this script directly (python3 read_dht.py)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
try:
#fields = measure_dht ({"dht_type" : 2302, "pin" : 5, 'ts_field_temperature': "temperature", 'ts_field_humidity': "humidity"})
fields = measure_dht ({"dht_type" : 22, "pin" : 4, 'ts_field_temperature': "temperature", 'ts_field_humidity': "humidity"})
if fields != {}:
print("Temp: {:.1f} F / {:.1f} °C Humidity: {}% ".format(fields['temperature']* (9 / 5) + 32, fields['temperature'], fields['humidity']))
except (KeyboardInterrupt, SystemExit):
pass
except RuntimeError as error:
logger.error("RuntimeError in DHT measurement: " + error.args[0])
except Exception as e:
logger.exception("Unhandled Exception in DHT measurement")