forked from eclipse-chariott/Agemo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmosquitto_connector.rs
215 lines (189 loc) · 7.84 KB
/
mosquitto_connector.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT
//! Implements the [`PubSubConnector`][`crate::pubsub_connector`] trait for the
//! [Mosquitto MQTT broker](https://mosquitto.org/).
//!
//! Implements the [`PubSubConnector`] trait, allowing for the pub sub service to communicate
//! directly with the Mosquitto MQTT broker. The pub sub service needs to communicate with the
//! broker to monitor the state of topics generated by the service for publishers.
use async_trait::async_trait;
use log::{error, info, warn};
use paho_mqtt::{self as mqtt, MQTT_VERSION_5};
use std::{process, sync::mpsc};
use crate::pubsub_connector::{self, MonitorMessage, PubSubAction, PubSubConnector};
/// Mosquitto broker's reserved topic for subscribe related notifications.
const SUBSCRIBE: &str = "$SYS/broker/log/M/subscribe";
/// Mosquitto broker's reserved topic for unsubscribe related notifications.
const UNSUBSCRIBE: &str = "$SYS/broker/log/M/unsubscribe";
/// Constant topic used by a publisher's last will and testament for unclean disconnect.
const LWT_PUBLISHER: &str = "publisher/disconnect";
/// Handles the connection to a Mosquitto MQTT v5 client.
pub struct MqttFiveBrokerConnector {
client: mqtt::AsyncClient,
}
impl MqttFiveBrokerConnector {
/// Instantiates the Mosquitto broker connector with an mqtt client.
///
/// # Arguments
///
/// * `client_id` - Id used when creating a new mqtt client.
/// * `broker_uri` - The uri of the broker that the client is connecting to.
fn new(client_id: String, broker_uri: String) -> Self {
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(broker_uri)
.client_id(client_id)
.finalize();
let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
error!("Error creating the client: {e:?}");
process::exit(1);
});
MqttFiveBrokerConnector { client: cli }
}
/// Maps an update notification from the Mosquitto messaging broker to a [`MonitorMessage`].
///
/// This function translates updates sent to Mosquitto topics used to track subscribe and
/// unsubscribe events. In addition, it tracks updates sent to the publisher last will and
/// testament topic.
///
/// # Arguments
///
/// * `topic` - The reserved topics used by the broker to provide updates about active topics.
/// * `payload` - The information posted on the reserved topic.
fn handle_subscription_update(topic: String, payload: String) -> Option<MonitorMessage> {
let msg_vec: Vec<&str> = payload.split_whitespace().collect();
match topic.as_str() {
SUBSCRIBE => msg_vec
.get(3)
.map(|sub_topic| {
info!("Added a subscriber to topic '{sub_topic}'.");
MonitorMessage {
context: sub_topic.to_string(),
action: PubSubAction::Subscribe,
}
})
.or_else(|| {
warn!("Invalid Subscribe: {payload}");
None
}),
UNSUBSCRIBE => msg_vec
.get(2)
.map(|sub_topic| {
info!("Removed a subscriber from topic '{sub_topic}'.");
MonitorMessage {
context: sub_topic.to_string(),
action: PubSubAction::Unsubscribe,
}
})
.or_else(|| {
warn!("Invalid Unsubscribe: {payload}");
None
}),
LWT_PUBLISHER => msg_vec
.get(1)
.map(|publisher| {
info!("LWT received from '{publisher}'.");
MonitorMessage {
context: publisher.to_string(),
action: PubSubAction::PubDisconnect,
}
})
.or_else(|| {
warn!("Invalid LWT: {payload}");
None
}),
_ => {
warn!("Unknown topic: {topic} with payload: {payload}");
None
}
}
}
/// Connects to the Mosquitto messaging broker.
///
/// This function handles client connection to the Mosquitto messaging broker.
///
/// # Arguments
///
/// * `cb_channel` - Channel used to report back updates from the broker.
/// * `message_cb` - Function for how to handle the update message from the broker.
fn connect_client(
&self,
cb_channel: mpsc::Sender<MonitorMessage>,
message_cb: fn(MonitorMessage, mpsc::Sender<MonitorMessage>),
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
// Sets the messaging callback that sends the monitor message to the given channel.
self.client
.set_message_callback(move |_cli: &mqtt::AsyncClient, msg| {
if let Some(msg) = msg {
let topic = msg.topic().to_string();
let payload = msg.payload_str().to_string();
if let Some(message) = Self::handle_subscription_update(topic, payload) {
message_cb(message, cb_channel.clone());
}
}
});
// Sets the last will and testament for pub sub monitor client if there is an unclean
// disconnect.
let lwt = mqtt::Message::new(
"pubsub_monitor_client",
"Monitor has lost connection",
mqtt::QOS_1,
);
// Sets connection options.
let conn_opts = mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5)
.clean_start(false)
.properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
.will_message(lwt)
.finalize();
// Connects the client to the messaging broker.
if let Err(err) = self.client.connect(conn_opts).wait() {
error!("Unable to connect: {err}");
process::exit(1);
}
Ok(())
}
/// Handles a subscription to a given topic.
async fn subscribe(&mut self, topic_name: String) {
let _res = self.client.subscribe(topic_name.clone(), mqtt::QOS_1).await;
}
/// Handles a publish of the given message to the given topic.
async fn publish(
&self,
topic_name: String,
msg: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let msg = mqtt::Message::new(topic_name.clone(), msg, mqtt::QOS_1);
self.client.publish(msg).await?;
Ok(())
}
}
#[async_trait]
impl PubSubConnector for MqttFiveBrokerConnector {
fn new(client_id: String, uri: String) -> Self {
Self::new(client_id, uri)
}
async fn monitor_topics(
&mut self,
cb_channel: mpsc::Sender<MonitorMessage>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Connect to broker with mqtt client. pass message_cb that handles sending data back to subscriber
info!("Connecting to MQTT server...");
let message_cb = pubsub_connector::update_topic_information;
let result = Self::connect_client(self, cb_channel, message_cb);
// TODO: actually handle error
if result.is_err() {
error!("Failed to connect to MQTT server...");
}
Self::subscribe(self, SUBSCRIBE.to_string()).await;
Self::subscribe(self, UNSUBSCRIBE.to_string()).await;
Self::subscribe(self, LWT_PUBLISHER.to_string()).await;
Ok(())
}
async fn delete_topic(
&self,
topic: String,
deletion_msg: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Self::publish(self, topic, deletion_msg).await
}
}