From e3ca52f475fcd1f7fc8944f97a4d63ca2473a718 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 17 Jul 2024 00:26:44 +0200 Subject: [PATCH] fix: mqtt: remove topic check (#20) This check will never be true, as the topic is always either screens or screens/${id}. Record the topic in the current span, so it's also included as fields in log statements, and log the raw payload in case it couldn't be parsed as a Command. --- Cargo.lock | 12 ++++++++++++ Cargo.nix | 45 +++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 1 + src/mqtt.rs | 18 ++++++++++-------- 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 004d638..822ab45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,6 +152,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bstr" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" +dependencies = [ + "memchr", + "regex-automata 0.4.7", + "serde", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -599,6 +610,7 @@ dependencies = [ name = "fossbeamer" version = "0.1.0" dependencies = [ + "bstr", "clap", "color-eyre", "eyre", diff --git a/Cargo.nix b/Cargo.nix index 4742a32..9ada40b 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -475,6 +475,43 @@ rec { ]; }; + "bstr" = rec { + crateName = "bstr"; + version = "1.9.1"; + edition = "2021"; + sha256 = "01ipr5rncw3kf4dyc1p2g00njn1df2b0xpviwhb8830iv77wbvq5"; + authors = [ + "Andrew Gallant " + ]; + dependencies = [ + { + name = "memchr"; + packageId = "memchr"; + usesDefaultFeatures = false; + } + { + name = "regex-automata"; + packageId = "regex-automata 0.4.7"; + optional = true; + usesDefaultFeatures = false; + features = [ "dfa-search" ]; + } + { + name = "serde"; + packageId = "serde"; + optional = true; + usesDefaultFeatures = false; + } + ]; + features = { + "alloc" = [ "memchr/alloc" "serde?/alloc" ]; + "default" = [ "std" "unicode" ]; + "serde" = [ "dep:serde" ]; + "std" = [ "alloc" "memchr/std" "serde?/std" ]; + "unicode" = [ "dep:regex-automata" ]; + }; + resolvedDefaultFeatures = [ "alloc" "default" "std" "unicode" ]; + }; "byteorder" = rec { crateName = "byteorder"; version = "1.5.0"; @@ -1691,6 +1728,10 @@ rec { ]; src = lib.cleanSourceWith { filter = sourceFilter; src = ./.; }; dependencies = [ + { + name = "bstr"; + packageId = "bstr"; + } { name = "clap"; packageId = "clap"; @@ -5022,7 +5063,7 @@ rec { "unicode-script" = [ "regex-syntax?/unicode-script" ]; "unicode-segment" = [ "regex-syntax?/unicode-segment" ]; }; - resolvedDefaultFeatures = [ "alloc" "meta" "nfa-pikevm" "nfa-thompson" "std" "syntax" "unicode-case" "unicode-perl" "unicode-word-boundary" ]; + resolvedDefaultFeatures = [ "alloc" "dfa-search" "meta" "nfa-pikevm" "nfa-thompson" "std" "syntax" "unicode-case" "unicode-perl" "unicode-word-boundary" ]; }; "regex-syntax 0.6.29" = rec { crateName = "regex-syntax"; @@ -5633,7 +5674,7 @@ rec { "derive" = [ "serde_derive" ]; "serde_derive" = [ "dep:serde_derive" ]; }; - resolvedDefaultFeatures = [ "default" "derive" "serde_derive" "std" ]; + resolvedDefaultFeatures = [ "alloc" "default" "derive" "serde_derive" "std" ]; }; "serde_derive" = rec { crateName = "serde_derive"; diff --git a/Cargo.toml b/Cargo.toml index c4d9156..27a0fa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +bstr = "1.9.1" clap = { version = "4.5.9", features = ["derive"] } color-eyre = "0.6.3" eyre = "0.6.12" diff --git a/src/mqtt.rs b/src/mqtt.rs index 1ad400a..0fd319a 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,8 +1,8 @@ -use std::{sync::mpsc::Sender, thread, time::Duration}; - +use bstr::BStr; use fossbeamer::Command; use rumqttc::{Client, ClientError, MqttOptions, Packet, Publish}; -use tracing::{debug, warn}; +use std::{sync::mpsc::Sender, thread, time::Duration}; +use tracing::{debug, info, warn, Span}; pub(crate) struct Listener { pub id: String, @@ -28,14 +28,16 @@ impl Listener { payload, .. })) => { - if topic == "commands" { - if let Ok(command) = serde_json::from_slice::(&payload) { - debug!(?command, "received command"); + Span::current().record("topic", &topic); + match serde_json::from_slice::(&payload) { + Ok(command) => { + info!(?command, "received command"); self.sender.send(command).unwrap(); } - } else { - debug!(?topic, "received other topic"); + Err(e) => { + warn!(err=%e, payload=%BStr::new(&payload), "received payload that couldn't be parsed"); + } } } rumqttc::Event::Incoming(incoming) => {