-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmqtt.py
172 lines (156 loc) · 7.6 KB
/
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C)2018-24 Maurizio Montel (dr-prodigy) <[email protected]>
# This file is part of hompi <https://github.com/dr-prodigy/hompi>.
#
# hompi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# hompi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with hompi. If not, see <http://www.gnu.org/licenses/>.
import config
import re
import time
from datetime import datetime, timedelta
from utils import LOG_INFO, log_stdout, log_stderr, LOG_DEBUG, LOG_WARN
from paho.mqtt import client as mqtt_client
from paho.mqtt.enums import CallbackAPIVersion
CONNECT_TIMEOUT_SECS = 5
RETRY_MINUTES = 2
publish_time = datetime.now()
class MQTT:
def __init__(self, io_status):
self.__connected = False
self.__areas = {}
self.__io_status = io_status
self.__client = None
def __connect(self):
global publish_time
# lazy MQTT server connection
if not self.__client or not self.__connected:
try:
self.__client = self.__connect_mqtt()
except Exception as e:
log_stderr('*MQTT* - Failed to connect: {} -> delaying {} mins'.format(e, RETRY_MINUTES))
publish_time = datetime.now() + timedelta(minutes=RETRY_MINUTES)
return self.__connected
def __connect_mqtt(self) -> mqtt_client:
def on_connect(client, userdata, flags, rc, properties):
if flags.session_present:
pass
if rc == 0:
log_stdout('MQTT', 'Connected to broker {}:{}'
.format(config.MQTT_BROKER, config.MQTT_PORT), LOG_INFO)
self.__connected = True
for area in self.__areas.values(): area['subscribed'] = False
else:
log_stderr('*MQTT* - Failed to connect to broker {}:{}: {}'.
format(config.MQTT_BROKER, config.MQTT_PORT, rc))
def on_disconnect(client, userdata, flags, rc, properties):
if rc == 0:
# successful disconnect
log_stdout('MQTT', 'Disconnected: ok', LOG_INFO)
else:
# error processing
log_stderr('*MQTT* - Failed to disconnect: {}'.format(rc))
self.__connected = False
self.__connected = False
_client = mqtt_client.Client(CallbackAPIVersion.VERSION2)
# client.username_pw_set(username, password)
_client.on_connect = on_connect
_client.on_disconnect = on_disconnect
_client.connect(config.MQTT_BROKER, config.MQTT_PORT)
_client.loop_start()
start_time = datetime.now()
while not self.__connected and \
(datetime.now() - start_time).total_seconds() < CONNECT_TIMEOUT_SECS:
time.sleep(.1)
if config.LOG_LEVEL == LOG_DEBUG:
_client.subscribe("$SYS/broker/log/#")
return _client
def __subscribe(self, topic):
def on_message(client, userdata, msg):
msg_topic = "DEBUG" if msg.topic.startswith("$SYS/broker/log/") else msg.topic
log_stdout('MQTT', '({}) -> {}'.format(msg_topic, msg.payload.decode()), LOG_DEBUG)
# topic decoding
for area_id in self.__areas.keys():
area = self.__areas[area_id]
if area['topic'] == msg.topic:
cur_area = self.__io_status.areas[area_id]
cur_area["last_update"] = datetime.now().isoformat()
cur_area['temp_calibration'] = area['calibration']
temp = re.search(area['cur_temp_c_regex'], msg.payload.decode())
cur_temp_c = float(temp.group(1)) if temp else 999
if area['req_temp_c_regex']:
temp = re.search(area['req_temp_c_regex'], msg.payload.decode())
req_temp_c = float(temp.group(1)) if temp else 0
if cur_area['req_temp_c'] != req_temp_c:
cur_area['req_temp_c'] = req_temp_c
cur_area['manual_set'] = True
else:
req_temp_c = "-"
cur_area['cur_temp_c'] = cur_temp_c
log_stdout('MQTT', '{} update - cur_temp_c: {} - req_temp_c: {}'.
format(area['mqtt_name'], cur_temp_c, req_temp_c), LOG_INFO)
self.__client.subscribe(topic)
self.__client.on_message = on_message
def __publish(self, area_id, req_temp_c, calibration):
published = False
area = self.__areas[area_id]
if area['mqtt_trv_name']:
topic = '{}/{}/set'.format(config.MQTT_BASE_TOPIC, area['mqtt_trv_name'])
payload = (area['mqtt_trv_publish_payload']
.replace('**TEMP**', str(req_temp_c))
.replace('**TEMP_CAL**', str(calibration)))
if self.__connected:
if self.__client.publish(topic, payload).is_published():
log_stdout('MQTT', 'Area: {} - Publish: req. temp.: {}, calibration: {}'.
format(area['area_name'], req_temp_c, calibration), LOG_INFO)
published = True
else:
log_stdout('MQTT', '{} publish failed -> delaying {} mins'.
format(topic, RETRY_MINUTES), LOG_WARN)
else:
log_stdout('MQTT', 'Not connected - Publish SKIPPED {} -> ({})'.
format(payload, topic), LOG_WARN)
return published
def update_areas(self):
global publish_time
if datetime.now() >= publish_time:
# subscribe to topics
for area in self.__areas.values():
if not area['subscribed']:
if not self.__connect(): return
self.__subscribe(area['topic'])
area['subscribed'] = True
log_stdout('MQTT',
'Area: {} - subscribe ({})'.format(area['area_name'], area['topic']), LOG_INFO)
# publish updates
for area_id in self.__io_status.areas.keys():
area = self.__io_status.areas[area_id]
if not area['published']:
if not self.__connect(): return
area['published'] = self.__publish(area_id, area['req_temp_c'], area['temp_calibration'])
def register(self, area_id, area_name,
mqtt_name, cur_temp_c_regex, req_temp_c_regex, calibration,
mqtt_trv_name, mqtt_trv_publish_payload):
self.__areas[area_id] = \
{ 'area_name': area_name, 'mqtt_name': mqtt_name,
'topic': '{}/{}'.format(config.MQTT_BASE_TOPIC, mqtt_name),
'cur_temp_c_regex': cur_temp_c_regex, 'req_temp_c_regex': req_temp_c_regex,
'calibration': calibration, 'mqtt_trv_name': mqtt_trv_name,
'mqtt_trv_publish_payload': mqtt_trv_publish_payload,
'subscribed': False }
def cleanup(self):
log_stdout('MQTT', 'Cleanup', LOG_INFO)
if self.__client:
self.__client.loop_stop()
self.__client.disconnect()
self.__areas.clear()