-
Notifications
You must be signed in to change notification settings - Fork 71
How to use with actix #146
Comments
No, but I used both. In theory this should be a The notifications need to be received in a dedicated thread: There is a project actix-broker that might help to distribute the |
I've been working on a proof of concept that uses actix/rumqtt. The actor implementation is below, as the TODO notes show, it's pretty rough, no real error handling/graceful exit yet. But provided the mailbox/notification channel sizes are large enough it seems to be able to handle the deluge of messages from a public MQTT server. use actix::prelude::*;
use futures::future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
/// MQTT errors.
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "MqttError::FromUtf8 {}", _0)]
FromUtf8(std::string::FromUtf8Error),
}
/// MQTT user, password configuration.
#[derive(Debug, Clone)]
pub struct UserPasswordConfiguration {
pub user: String,
pub password: String,
}
/// MQTT topic configuration.
#[derive(Debug, Clone)]
pub struct TopicConfiguration {
pub topic: String,
pub qos: u8,
}
/// MQTT configuration.
#[derive(Debug, Clone)]
pub struct Configuration {
pub mailbox_capacity: usize,
pub client_id: String,
pub host: String,
pub port: u16,
pub user_password: Option<UserPasswordConfiguration>,
pub topics: Vec<TopicConfiguration>,
pub reconnnect_s: u64,
pub keep_alive_s: u16,
pub clean_session: bool,
pub notification_channel_capacity: usize,
}
impl Default for Configuration {
fn default() -> Self {
Configuration {
mailbox_capacity: 512,
client_id: "a7g87sdg7dsg7sd9g70s".to_owned(),
host: "test.mosquitto.org".to_owned(),
port: 1883,
user_password: None,
topics: vec![TopicConfiguration {
topic: "#".to_owned(),
qos: 1,
}],
reconnnect_s: 5,
keep_alive_s: 10,
clean_session: false,
notification_channel_capacity: 512,
}
}
}
/// MQTT actor.
pub struct Actor {
/// MQTT configuration.
pub configuration: Arc<Configuration>,
/// MQTT client handle.
/// Created with None and assigned Some in actor `started` function.
pub mqtt_client: Option<rumqtt::MqttClient>,
/// MQTT notifications thread handle.
/// Created with None and assigned Some in actor `started` function.
pub thread_handle: Option<std::thread::JoinHandle<()>>,
/// Publish message counter.
pub publish_counter: AtomicUsize,
}
impl Actor {
/// Start instance of actor with configuration.
pub fn start(configuration: &Configuration) -> Addr<Actor> {
let configuration = Arc::new(configuration.clone());
Supervisor::start(|_| Actor {
configuration,
mqtt_client: None,
thread_handle: None,
publish_counter: AtomicUsize::new(0),
})
}
/// MQTT client options from configuration.
pub fn mqtt_client_options(&self) -> rumqtt::MqttOptions {
// TODO: Check client disconnection/reconnection/error handling.
let reconnection_options =
rumqtt::ReconnectOptions::AfterFirstSuccess(self.configuration.reconnnect_s);
let options = rumqtt::MqttOptions::new(
self.configuration.client_id.to_owned(),
self.configuration.host.to_owned(),
self.configuration.port,
)
.set_keep_alive(self.configuration.keep_alive_s)
.set_clean_session(self.configuration.clean_session)
.set_reconnect_opts(reconnection_options)
.set_notification_channel_capacity(self.configuration.notification_channel_capacity);
match &self.configuration.user_password {
Some(user_password) => {
options.set_security_opts(rumqtt::SecurityOptions::UsernamePassword(
user_password.user.to_owned(),
user_password.password.to_owned(),
))
}
None => options,
}
}
/// Send publish message to actor.
pub fn try_send_publish(addr: &Addr<Actor>, publish: Publish) -> () {
match addr.try_send(publish) {
Ok(_) => {}
Err(err) => {
// TODO: How to handle send errors.
error!("{}", err);
}
};
}
/// Spawn thread to iterate over notifications channel.
pub fn thread_spawn_notifications(
&self,
notifications: rumqtt::Receiver<rumqtt::Notification>,
addr: Addr<Actor>,
) -> std::thread::JoinHandle<()> {
// TODO: How to handle process exit while messages being handled (ack later?).
std::thread::spawn(move || {
for notification in notifications {
match notification {
rumqtt::Notification::Publish(publish) => {
// Convert to publish message and send to actor.
match Publish::from_mqtt311_publish(publish) {
Ok(publish) => Actor::try_send_publish(&addr, publish),
// Messages that are not JSON or valid UTF8 are dropped.
Err(_err) => (),
};
}
_ => {
// TODO: Handle other notification types.
info!("notification: {:?}", notification);
}
};
}
})
}
}
impl actix::Supervised for Actor {
// TODO: Implement restarting.
}
impl actix::Actor for Actor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
info!("MqttActor::started");
// Increase mailbox capacity for increased message throughput.
ctx.set_mailbox_capacity(self.configuration.mailbox_capacity);
// MQTT client options.
let options = self.mqtt_client_options();
// Start MQTT client and get notifications receiver.
let (mut mqtt_client, notifications) = rumqtt::MqttClient::start(options).unwrap();
// Spawn thread to iterate over notifications.
let address = ctx.address();
let thread_handle = self.thread_spawn_notifications(notifications, address);
// Subscribe to topic(s).
for (_i, x) in self.configuration.topics.iter().enumerate() {
mqtt_client
.subscribe(x.topic.to_owned(), rumqtt::QoS::from_u8(x.qos).unwrap())
.unwrap()
}
// Set client and thread handle in actor state to prevent drop.
self.mqtt_client = Some(mqtt_client);
self.thread_handle = Some(thread_handle);
}
// TODO: Implement signals handling for graceful actor stop.
fn stopping(&mut self, _ctx: &mut Context<Self>) -> Running {
info!("MqttActor::stopping");
let client = self.mqtt_client.as_mut().unwrap();
client.pause().unwrap();
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
info!("MqttActor::stopped");
}
}
/// MQTT publish payload type.
#[derive(Debug, Serialize, Deserialize)]
pub enum PublishPayload {
Json(serde_json::Value),
Text(String),
}
/// MQTT publish type.
#[derive(Debug, Serialize, Deserialize)]
pub struct Publish {
/// Message topic.
pub topic: String,
/// Message payload, JSON or text.
pub payload: PublishPayload,
/// Message quality of service.
pub qos: u8,
/// Message duplicate flag.
pub duplicate: bool,
/// Message retain flag.
pub retain: bool,
/// Message packet identifier.
pub pkid: Option<u16>,
}
impl Publish {
/// Try to convert `mqtt311::Publish` to `Publish`.
pub fn from_mqtt311_publish(publish: mqtt311::Publish) -> Result<Publish, Error> {
// First try to deserialise to JSON.
let payload = match serde_json::from_slice(publish.payload.as_ref()) {
Ok(value) => Ok(PublishPayload::Json(value)),
// If deserialisation fails, try to parse UTF8 string.
Err(_err) => match String::from_utf8(publish.payload.to_vec()) {
Ok(value) => Ok(PublishPayload::Text(value)),
Err(err) => Err(Error::FromUtf8(err)),
},
}?;
let pkid = match publish.pkid {
Some(pkid) => Some(pkid.0),
None => None,
};
Ok(Publish {
topic: publish.topic_name,
payload,
qos: publish.qos.to_u8(),
duplicate: publish.dup,
retain: publish.retain,
pkid,
})
}
}
impl actix::Message for Publish {
type Result = Result<(), Error>;
}
impl actix::Handler<Publish> for Actor {
type Result = ResponseActFuture<Self, (), Error>;
fn handle(&mut self, _msg: Publish, _ctx: &mut Context<Self>) -> Self::Result {
// TODO: Better metrics collection.
// info!("publish: {:?}", msg);
let counter = self.publish_counter.fetch_add(1, Ordering::Relaxed);
if counter % 1000 == 0 {
info!("publish_counter: {}", counter);
}
// TODO: Implement parsing/messages to other actors.
// TODO: How to handle errors in handler.
let placeholder = future::ok(());
let wrapped = actix::fut::wrap_future(placeholder);
Box::new(wrapped)
}
} |
does anyone have an example of running both actix and rumqtt?
The text was updated successfully, but these errors were encountered: