-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpt8005_influx.py
138 lines (117 loc) · 4.16 KB
/
pt8005_influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import serial
from datetime import datetime
from influxdb import InfluxDBClient
class pt8005():
''' Serial communication '''
SYNC = b'\xa5'
INDICATOR_DBA = b'\x1B'
INDICATOR_DBC = b'\x1C'
INDICATOR_DATA = b'\x0D'
POWER_OFF = b'\x33'
TRANSMIT = b'\x5A\xAC'
TOGGLE_RECORD = b'\x55'
TOGGLE_DISPLAY_MAX = b'\x11'
TOGGLE_DISPLAY_FAST = b'\x77'
TOGGLE_RANGE = b'\x88'
TOGGLE_DBA_DBC = b'\x99'
def __init__(self, port="/dev/ttyUSB0", baudrate=9600):
serialport = port
try:
self.ser = serial.Serial(serialport, baudrate)
self.ser.isOpen()
self.ser.write(self.TRANSMIT)
except IOError:
print("IO Error: Can not connect to", port)
exit(1)
def _send_cmd(self, cmd):
self.ser.write(cmd)
def off(self):
''' Turn off device '''
self._send_cmd(self.POWER_OFF)
def rec(self):
''' Starts/stops recording in PT8005
Not needed to read data via serial port.
'''
self._send_cmd(self.TOGGLE_RECORD)
def display(self):
''' Switch display mode fast/slow '''
self.ser.write(self.TOGGLE_DISPLAY_FAST)
def range(self):
''' Switch range '''
self.ser.write(self.TOGGLE_RANGE)
def type(self):
''' Switch dBA/dBC '''
self.ser.write(self.TOGGLE_DBA_DBC)
def read(self, num=1):
''' Read data from serial port '''
data = self.ser.read(num)
return data
def output(self, value):
print(value, " ", end="")
for i in range(1, int(value)):
print("*", end="")
print("")
def stream(self):
''' Stream decoding of data sent by the device '''
while 1:
data = self.read(1) # read byte from serial
if data == self.SYNC: # sync found?
data = self.read(1) # read next
if data == self.INDICATOR_DBA:
self.state_dba_dbc = self.INDICATOR_DBA
if data == self.INDICATOR_DBC:
self.state_dba_dbc = self.INDICATOR_DBC
if data == self.INDICATOR_DATA:
# Read two bytes from serial port
data1 = self.read(1) # read next
data2 = self.read(1) # read next
# Byte 1
part1 = data1[0]
# Byte 2 - extract Bit 5-8
part2 = bytes([data2[0] >> 4])
# Byte 2 - extract Bit 1-4
part3 = bytes([data2[0] & 15])
# Build float
value = (part1*10)+int(part2.hex())+(int(part3.hex())/10)
return value
class datadump():
def __init__(self, host="localhost", dbname="pt8005_pa10", port="8086"):
try:
self.client = InfluxDBClient(host, port)
self.client.switch_database(dbname)
except:
print("Some error occured.")
exit(1)
def _create_json(self, value, type):
data = [{"measurement": "Construction Site ABC HH",
"tags": {
"Device": "PT001",
"Room": "1.2.3",
"Floor": "3",
"Name": "Joe",
"Street": "Townhall 10",
"Postcode": "20000",
"City": "Hamburg"
},
"time": str(datetime.now()),
"fields": {
"Type": type,
"Value": value
}
}]
return data
def dump(self, value=0, type="dBA"):
data = self._create_json(value, "dBA")
result = self.client.write_points(data)
return result
def main():
print("Connecting to serial port ...")
pt = pt8005()
print("Connecting to InfluxDB ...")
dd = datadump()
print("Reading data from PeakTech 8005 ...")
while 1:
value = pt.stream() # Read from serialport
dd.dump(value, "dBA") # Write values to InfluxDB
if __name__ == "__main__":
main()