-
Notifications
You must be signed in to change notification settings - Fork 17
/
MqttHandler.cpp
189 lines (181 loc) · 7.23 KB
/
MqttHandler.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#include "MqttHandler.h"
String getId() {
uint64_t macAddress = ESP.getEfuseMac();
uint64_t macAddressTrunc = macAddress << 40;
uint32_t chipID = macAddressTrunc >> 40;
return String(chipID, HEX);
}
void MqttHandler::begin()
{
if (!isInit && preference.mqtt.host && preference.mqtt.port && preference.mqtt.topic && preference.mqtt.fn) {
#ifdef __debug
Serial.println("[MQTT] init");
#endif
mqtt_client = new PubSubClient(client);
mqtt_client->setServer(preference.mqtt.host, preference.mqtt.port);
mqtt_client->setCallback([this](char* topic, byte* payload, unsigned int length) { callback(topic, payload, length);});
snprintf(topics.config, TOPIC_MAX_SIZE, "%s/%s/config\0", preference.mqtt.topic, preference.mqtt.fn);
snprintf(topics.gpio, TOPIC_MAX_SIZE, "%s/%s/gpio\0", preference.mqtt.topic, preference.mqtt.fn);
snprintf(topics.automation, TOPIC_MAX_SIZE, "%s/%s/automation\0", preference.mqtt.topic, preference.mqtt.fn);
snprintf(topics.debug, TOPIC_MAX_SIZE, "%s/%s/debug\0", preference.mqtt.topic, preference.mqtt.fn);
isInit = true;
}
}
void MqttHandler::handle()
{
begin();
if (isInit && preference.mqtt.active && preference.mqtt.user && preference.mqtt.password && preference.health.mqtt >= 0) {
//MQTT failed retry to connect
if (mqtt_client->state() < MQTT_CONNECTED)
{
connect();
}
//MQTT config problem on MQTT do nothing
else if (mqtt_client->state() > MQTT_CONNECTED ) return;
//MQTT connected send status
else {
if (millis() > lastSend) {
//hpStatusChanged(hp.getStatus());
lastSend = millis();
}
mqtt_client->loop();
}
}
}
void MqttHandler::connect() {
// Loop until we're reconnected
int attempts = 0;
while (!mqtt_client->connected()) {
preference.health.mqtt = 0;
#ifdef __debug
Serial.printf("[MQTT] Connection attempt(%i)\n", attempts);
#endif
// Attempt to connect
mqtt_client->connect(getId().c_str(), preference.mqtt.user, preference.mqtt.password);
// If state < 0 (MQTT_CONNECTED) => network problem we retry RETRY_ATTEMPT times and then waiting for MQTT_RETRY_INTERVAL_MS and retry reapeatly
if (mqtt_client->state() < MQTT_CONNECTED) {
#ifdef __debug
Serial.println("[MQTT] Failed ");
#endif
if (attempts == RETRY_ATTEMPT) {
// Set health to -1 to block any new attempt. This value can be remotely reset to 0 by calling mqtt/retry via the api
preference.health.mqtt = -1;
return;
} else {
delay(250);
attempts++;
}
}
// If state > 0 (MQTT_CONNECTED) => config or server problem we stop retry
else if (mqtt_client->state() > MQTT_CONNECTED) {
#ifdef __debug
Serial.println("[MQTT] server connected");
#endif
preference.health.mqtt = 1;
return;
}
// We are connected
else if (topics.gpio) {
#ifdef __debug
Serial.println("[MQTT] server subscribed");
#endif
char gpiosTopic[strnlen(topics.gpio,TOPIC_MAX_SIZE)+3];
sprintf(gpiosTopic, "%s/+\0",topics.gpio);
mqtt_client->subscribe(gpiosTopic);
char automationsTopic[strnlen(topics.automation,TOPIC_MAX_SIZE)+3];
sprintf(automationsTopic, "%s/+\0",topics.automation);
mqtt_client->subscribe(automationsTopic);
mqtt_client->subscribe(topics.config);
mqtt_client->subscribe(topics.debug);
publishConfig();
}
}
}
void MqttHandler::disconnect() {
if (mqtt_client->state() == MQTT_CONNECTED) {
#ifdef __debug
Serial.println("[MQTT] disconnected");
#endif
mqtt_client->disconnect();
preference.health.mqtt = 0;
}
}
void MqttHandler::callback(char* topic, byte* payload, unsigned int length) {
#ifdef __debug
Serial.printf("[MQTT] callback data on topic %s\n", topic);
#endif
// Copy payload into message buffer
char message[length + 1];
for (int i = 0; i < length; i++) {
message[i] = (char)payload[i];
}
message[length] = '\0';
// Check if topic is contained in the topic payload
if (strstr(topic,this->topics.gpio)) {
// Logic to get what pin was published
int len = (strnlen(topic,TOPIC_MAX_SIZE) - strnlen(this->topics.gpio,TOPIC_MAX_SIZE))+1;
char pin_c[len];
strncpy(pin_c, topic + strnlen(this->topics.gpio,TOPIC_MAX_SIZE)+1, len-1);
pin_c[len] = '\0';
int pin = atoi(pin_c);
int state = atoi(message);
#ifdef __debug
Serial.printf("[MQTT] message %s for pin %i\n", message, pin);
#endif
if (pin && digitalRead(pin) != state) {
// We set the persist flag to false, to allow the mainloop to pick up new changes and react accordingly
preference.setGpioState(pin, state);
} else {
#ifdef __debug
Serial.print("[MQTT] state unchanged. Message dissmissed\n");
#endif
}
} else if (strstr(topic,this->topics.automation)) {
int len = (strnlen(topic,TOPIC_MAX_SIZE) - strnlen(this->topics.automation,TOPIC_MAX_SIZE))+1;
char id_c[len];
strncpy(id_c, topic + strnlen(this->topics.automation,TOPIC_MAX_SIZE)+1, len-1);
id_c[len] = '\0';
int id = atoi(id_c);
int state = atoi(message);
#ifdef __debug
Serial.printf("[MQTT] message %s for automation id %i\n", message, id);
#endif
if (state) {
// queue automation id, to be picked up by esp32.ino script
for (int i=0; i<MAX_AUTOMATIONS_NUMBER; i++) {
if (automationsQueued[i] == 0) {
automationsQueued[i] = id;
break;
}
}
}
}
}
void MqttHandler::publish(int pin){
if (isInit && mqtt_client->state() == MQTT_CONNECTED) {
char pinTopic[strnlen(topics.gpio,TOPIC_MAX_SIZE)+3];
snprintf(pinTopic,strnlen(pinTopic,TOPIC_MAX_SIZE), "%s/%i\0",topics.gpio,pin);
char state[2];
snprintf(state,sizeof(state), "%i\0",digitalRead(pin));
#ifdef __debug
Serial.printf("[MQTT] publishing pin %i state %s on topic: %s\n", pin, state, pinTopic);
#endif
if (!mqtt_client->publish(pinTopic, state, true)) {
#ifdef __debug
Serial.println("[MQTT] failed to publish");
#endif
mqtt_client->publish(topics.debug, (char*)(F("Failed to publish gpios list change")));
}
}
}
void MqttHandler::publishConfig(){
#ifdef __debug
Serial.println("MQTT: publishing configuration");
#endif
if (mqtt_client->publish_P(topics.config, preference.getGpiosJson().c_str(), false)) {
#ifdef __debug
Serial.println("MQTT: failed to publish");
#endif
mqtt_client->publish(topics.debug, (char*)(F("Failed to publish gpios list change")));
}
}