diff --git a/Cargo.lock b/Cargo.lock index 5ed9531..c04c92c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -621,6 +621,7 @@ dependencies = [ "color-eyre", "edid-rs", "eyre", + "parking_lot", "rumqttc", "serde", "serde_json", diff --git a/Cargo.nix b/Cargo.nix index 3a624ce..0eab298 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -1760,6 +1760,10 @@ rec { name = "eyre"; packageId = "eyre"; } + { + name = "parking_lot"; + packageId = "parking_lot"; + } { name = "rumqttc"; packageId = "rumqttc"; diff --git a/Cargo.toml b/Cargo.toml index 9521850..abde778 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ clap = { version = "4.5.9", features = ["derive"] } color-eyre = "0.6.3" edid-rs = "0.1.0" eyre = "0.6.12" +parking_lot = "0.12.3" rumqttc = "0.24.0" serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.120" diff --git a/src/browser.rs b/src/browser.rs index b87f17c..794b6f2 100644 --- a/src/browser.rs +++ b/src/browser.rs @@ -9,7 +9,7 @@ use tracing::{debug, warn}; use wry::WebViewBuilder; /// Commands to control the browser instance. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "kind")] pub enum Command { LoadUrl { url: String }, diff --git a/src/main.rs b/src/main.rs index 67bd485..f2c4f34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,9 @@ struct Cli { #[arg(long = "default-config")] default_config_path: Option, + + #[arg(long)] + mqtt_topic_prefix: Option, } fn main() -> color_eyre::eyre::Result<()> { @@ -121,14 +124,20 @@ fn main() -> color_eyre::eyre::Result<()> { let (tx, rx) = channel(); - let listener = mqtt::Listener { - id: config.id.unwrap_or(display_info.serial), - host: config.host, - port: config.port, - sender: tx, - }; + let listener = mqtt::Listener::new( + config.id.unwrap_or_else(|| display_info.serial.clone()), + config.host, + config.port, + cli.mqtt_topic_prefix + .unwrap_or_else(|| "screens".to_string()), + )?; + + // register our display + // FUTURWORK: multiple display support + listener + .add_display(&display_info, tx) + .context("adding display")?; - listener.start().context("starting the mqtt listener")?; fossbeamer::spawn_browser(cli.url, rx)?; Ok(()) diff --git a/src/mqtt.rs b/src/mqtt.rs index 0fd319a..8f2689c 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,61 +1,139 @@ use bstr::BStr; -use fossbeamer::Command; -use rumqttc::{Client, ClientError, MqttOptions, Packet, Publish}; -use std::{sync::mpsc::Sender, thread, time::Duration}; +use eyre::Context; +use fossbeamer::{Command, Info}; +use parking_lot::RwLock; +use rumqttc::{Client, MqttOptions, Packet, Publish}; +use std::{ + collections::HashMap, + sync::{mpsc::Sender, Arc}, + thread, + time::Duration, +}; use tracing::{debug, info, warn, Span}; +/// Maintains a connection to an MQTT broker. pub(crate) struct Listener { - pub id: String, - pub host: String, - pub port: u16, - pub sender: Sender, + /// The MQTT client + client: rumqttc::Client, + + /// The topic that's prepended before IDs in the topic + topic_prefix: String, + + /// Senders expecting commands to be sent to, keyed by their topic. + senders: Arc>>>, } impl Listener { - pub(crate) fn start(self) -> Result<(), ClientError> { - let (client, mut connection) = - Client::new(MqttOptions::new(&self.id, self.host, self.port), 64); - - client.subscribe("screens", rumqttc::QoS::AtLeastOnce)?; - client.subscribe(format!("screens/{}", self.id), rumqttc::QoS::AtLeastOnce)?; - - thread::spawn(move || { - for event in connection.iter() { - match event { - Ok(event) => match event { - rumqttc::Event::Incoming(Packet::Publish(Publish { - topic, - payload, - .. - })) => { - Span::current().record("topic", &topic); - match serde_json::from_slice::(&payload) { - Ok(command) => { - info!(?command, "received command"); - - self.sender.send(command).unwrap(); - } - Err(e) => { - warn!(err=%e, payload=%BStr::new(&payload), "received payload that couldn't be parsed"); + /// Prepares a connection to the broker, and spawns off a thread dealing + /// with received messages. + /// It spawns off a thread relaying messages to the Senders added in a + /// [add_display] call. + pub fn new( + id: impl Into, + host: impl Into, + port: u16, + topic_prefix: impl Into + Clone, + ) -> eyre::Result { + let (client, mut connection) = Client::new(MqttOptions::new(id, host, port), 64); + + let senders = Arc::new(RwLock::new( + HashMap::>::new(), + )); + + let topic_prefix: String = topic_prefix.into(); + let catchall_topic = topic_prefix.clone(); + + thread::spawn({ + let senders = senders.clone(); + let catchall_topic = catchall_topic.clone(); + move || { + for event in connection.iter() { + match event { + Ok(event) => match event { + rumqttc::Event::Incoming(Packet::Publish(Publish { + topic, + payload, + .. + })) => { + Span::current().record("topic", &topic); + + // parse the command + let command = match serde_json::from_slice::(&payload) { + Ok(command) => { + info!(?command, "received command"); + command + } + Err(e) => { + warn!(err=%e, payload=%BStr::new(&payload), "received payload that couldn't be parsed"); + continue; + } + }; + + if topic == catchall_topic { + for (_topic, sender) in senders.read().iter() { + if let Err(e) = sender.send(command.clone()) { + warn!(err=%e, "unable to send command to tx"); + } + } + } else { + match senders.read().get(&topic) { + None => { + warn!("couldn't find topic"); + continue; + } + Some(tx) => { + if let Err(e) = tx.send(command) { + warn!(err=%e, "unable to send command to tx"); + } + } + } } } + rumqttc::Event::Incoming(incoming) => { + debug!(?incoming, "other incoming event"); + } + rumqttc::Event::Outgoing(out) => { + debug!(?out, "outgoing event"); + } + }, + Err(e) => { + warn!(err=%e, "connection error"); + // sleep a bit + std::thread::sleep(Duration::from_secs(5)); } - rumqttc::Event::Incoming(incoming) => { - debug!(?incoming, "other incoming event"); - } - rumqttc::Event::Outgoing(out) => { - debug!(?out, "outgoing event"); - } - }, - Err(e) => { - warn!(err=%e, "connection error"); - // sleep a bit - std::thread::sleep(Duration::from_secs(5)); } } } }); + // subscribe to the catchall + client + .subscribe(catchall_topic, rumqttc::QoS::AtLeastOnce) + .context("subscribing to catchall topic")?; + + Ok(Self { + client, + senders, + topic_prefix, + }) + } + + /// Register a new display, using the passed display_info. + /// `set` requests received are sent to the passed channel. + pub fn add_display( + &self, + display_info: &Info, + tx: Sender, + ) -> eyre::Result<()> { + let k = &display_info.serial; + let topic_str = format!("{}/{}", self.topic_prefix, k); + + self.client + .subscribe(&topic_str, rumqttc::QoS::AtLeastOnce) + .context("subscribing to topic")?; + + self.senders.write().insert(topic_str, tx); + Ok(()) } }