Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/command metadata #9

Merged
merged 9 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
* All registered command topics are now published upon connection with the broker or when the
command is registered.

### Fixed
* [#3](https://github.com/quartiq/minireq/issues/3) Fixed an issue where large responses would trigger an internal panic
* [#7](https://github.com/quartiq/minireq/issues/7) Fixed serialization of responses so they are readable
Expand Down
213 changes: 166 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ use minimq::{

use serde_json_core::heapless::{String, Vec};

use log::{info, warn};
use log::{debug, info, warn};

pub mod response;
pub use response::Response;

// Correlation data for command republishing.
const REPUBLISH_CORRELATION_DATA: Property = Property::CorrelationData("REPUBLISH".as_bytes());

// The maximum topic length of any settings path.
const MAX_TOPIC_LENGTH: usize = 128;

Expand Down Expand Up @@ -113,34 +116,34 @@ impl<E> From<minimq::Error<E>> for Error<E> {
mod sm {
smlang::statemachine! {
transitions: {
*Init + Connected = Connected,
Connected + Subscribed = Active,
*Init + Update [is_connected] / reset = Subscribing,
Subscribing + Update [subscribe] = Republishing,
Republishing + Update [republish] = Active,

// We can always begin republishing again from the active processing state.
Active + PendingRepublish = Republishing,

// All states can reset if the MQTT broker connection is lost.
_ + Reset = Init,
}
}

pub struct Context;

impl StateMachineContext for Context {}
}

type Handler<Context, E, const RESPONSE_SIZE: usize> =
fn(&mut Context, &str, &[u8]) -> Result<Response<RESPONSE_SIZE>, Error<E>>;

struct HandlerMeta<Context, E, const RESPONSE_SIZE: usize> {
handler: Handler<Context, E, RESPONSE_SIZE>,
republished: bool,
}

/// MQTT request/response interface.
pub struct Minireq<Context, Stack, Clock, const MESSAGE_SIZE: usize, const NUM_REQUESTS: usize>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
{
handlers: heapless::LinearMap<
String<MAX_TOPIC_LENGTH>,
Handler<Context, Stack::Error, MESSAGE_SIZE>,
NUM_REQUESTS,
>,
mqtt: minimq::Minimq<Stack, Clock, MESSAGE_SIZE, 1>,
prefix: String<MAX_TOPIC_LENGTH>,
state: sm::StateMachine<sm::Context>,
machine: sm::StateMachine<MinireqContext<Context, Stack, Clock, MESSAGE_SIZE, NUM_REQUESTS>>,
}

impl<Context, Stack, Clock, const MESSAGE_SIZE: usize, const NUM_REQUESTS: usize>
Expand Down Expand Up @@ -176,14 +179,24 @@ where
let mut prefix: String<MAX_TOPIC_LENGTH> = String::new();
write!(&mut prefix, "{}/command", device_prefix).map_err(|_| Error::PrefixTooLong)?;

Ok(Self {
let context = MinireqContext {
handlers: heapless::LinearMap::default(),
mqtt,
prefix,
state: sm::StateMachine::new(sm::Context),
};

Ok(Self {
machine: sm::StateMachine::new(context),
})
}
}

impl<Context, Stack, Clock, const MESSAGE_SIZE: usize, const NUM_REQUESTS: usize>
Minireq<Context, Stack, Clock, MESSAGE_SIZE, NUM_REQUESTS>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
{
/// Associate a handler to be called when receiving the specified request.
///
/// # Args
Expand All @@ -194,10 +207,28 @@ where
topic: &str,
handler: Handler<Context, Stack::Error, MESSAGE_SIZE>,
) -> Result<bool, Error<Stack::Error>> {
self.handlers
.insert(String::from(topic), handler)
let added = self
.machine
.context_mut()
.handlers
.insert(
String::from(topic),
HandlerMeta {
handler,
republished: false,
},
)
.map(|prev| prev.is_none())
.map_err(|_| Error::RegisterFailed)
.map_err(|_| Error::RegisterFailed)?;

// Force a republish of the newly-registered command after adding it. We ignore failures of
// event processing here since that would imply we are adding the handler before we've even
// gotten to the republish state.
self.machine
.process_event(sm::Events::PendingRepublish)
.ok();

Ok(added)
}

fn _handle_mqtt<F>(&mut self, mut f: F) -> Result<(), Error<Stack::Error>>
Expand All @@ -208,14 +239,23 @@ where
&[u8],
) -> Result<Response<MESSAGE_SIZE>, Error<Stack::Error>>,
{
let Self {
let MinireqContext {
handlers,
mqtt,
prefix,
..
} = self;
} = self.machine.context_mut();

match mqtt.poll(|client, topic, message, properties| {
// If the incoming message has republish correlation data, ignore it.
if properties
.iter()
.any(|&prop| prop == REPUBLISH_CORRELATION_DATA)
{
debug!("Ignoring republish data");
return;
}

let path = match topic.strip_prefix(prefix.as_str()) {
// For paths, we do not want to include the leading slash.
Some(path) => {
Expand All @@ -233,7 +273,7 @@ where

// Perform the action
let response = match handlers.get(&String::from(path)) {
Some(&handler) => f(handler, path, message).unwrap_or_else(Response::error),
Some(meta) => f(meta.handler, path, message).unwrap_or_else(Response::error),
None => Response::custom(-1, "Unregistered request"),
};

Expand Down Expand Up @@ -287,7 +327,7 @@ where
Err(minimq::Error::SessionReset) => {
// Note(unwrap): It's always safe to unwrap the reset event. All states must handle
// it.
self.state.process_event(sm::Events::Reset).unwrap();
self.machine.process_event(sm::Events::Reset).unwrap();
Ok(())
}
Err(other) => Err(Error::Mqtt(other)),
Expand All @@ -310,33 +350,112 @@ where
&[u8],
) -> Result<Response<MESSAGE_SIZE>, Error<Stack::Error>>,
{
if !self.mqtt.client.is_connected() {
if !self.machine.context_mut().mqtt.client.is_connected() {
// Note(unwrap): It's always safe to unwrap the reset event. All states must handle it.
self.state.process_event(sm::Events::Reset).unwrap();
self.machine.process_event(sm::Events::Reset).unwrap();
}

match *self.state.state() {
sm::States::Init => {
if self.mqtt.client.is_connected() {
// Note(unwrap): It's always safe to process this event in the INIT state.
self.state.process_event(sm::Events::Connected).unwrap();
}
}
sm::States::Connected => {
// Note(unwrap): We ensure that this storage is always sufficiently large to store
// the wildcard post-fix for MQTT.
let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> =
String::from(self.prefix.as_str());
prefix.push_str("/#").unwrap();

if self.mqtt.client.subscribe(&prefix, &[]).is_ok() {
// Note(unwrap): It is always safe to process a Subscribed event in this state.
self.state.process_event(sm::Events::Subscribed).unwrap();
}
}
sm::States::Active => {}
}
self.machine.process_event(sm::Events::Update).ok();

self._handle_mqtt(f)
}
}

struct MinireqContext<Context, Stack, Clock, const MESSAGE_SIZE: usize, const NUM_REQUESTS: usize>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
{
handlers: heapless::LinearMap<
String<MAX_TOPIC_LENGTH>,
HandlerMeta<Context, Stack::Error, MESSAGE_SIZE>,
NUM_REQUESTS,
>,
mqtt: minimq::Minimq<Stack, Clock, MESSAGE_SIZE, 1>,
prefix: String<MAX_TOPIC_LENGTH>,
}

impl<Context, Stack, Clock, const MESSAGE_SIZE: usize, const NUM_REQUESTS: usize>
sm::StateMachineContext for MinireqContext<Context, Stack, Clock, MESSAGE_SIZE, NUM_REQUESTS>
where
Stack: TcpClientStack,
Clock: embedded_time::Clock,
{
/// Reset the republish state of all of the handlers.
fn reset(&mut self) {
for HandlerMeta {
ref mut republished,
..
} in self.handlers.values_mut()
{
*republished = false;
}
}

/// Guard to handle subscription to the command prefix.
///
/// # Returns
/// Error if the command prefix has not yet been subscribed to.
fn subscribe(&mut self) -> Result<(), ()> {
// Note(unwrap): We ensure that this storage is always sufficiently large to store
// the wildcard post-fix for MQTT.
let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = String::from(self.prefix.as_str());
prefix.push_str("/#").unwrap();

self.mqtt.client.subscribe(&prefix, &[]).map_err(|_| ())
}

/// Guard to check for an MQTT broker connection.
///
/// # Returns
/// Ok if the MQTT broker is connected, false otherwise.
fn is_connected(&mut self) -> Result<(), ()> {
if self.mqtt.client.is_connected() {
Ok(())
} else {
Err(())
}
}

/// Guard to handle republishing all of the command information.
///
/// # Returns
/// Ok if all command information has been republished. Error if there are still more to be
/// published.
fn republish(&mut self) -> Result<(), ()> {
let MinireqContext {
mqtt,
handlers,
prefix,
..
} = self;

for (command_prefix, HandlerMeta { republished, .. }) in handlers
.iter_mut()
.filter(|(_, HandlerMeta { republished, .. })| !republished)
{
// Note(unwrap): The unwrap cannot fail because of restrictions on the max topic
// length.
let mut topic: String<{ 2 * MAX_TOPIC_LENGTH + 1 }> = String::from(prefix.as_str());
topic.push_str("/").unwrap();
topic.push_str(command_prefix).unwrap();

mqtt.client
.publish(
&topic,
// Empty payload would correspond to deleting a retained message.
"{}".as_bytes(),
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved
// TODO: When Minimq supports more QoS levels, this should be increased to
// ensure that the client has received it at least once.
QoS::AtMostOnce,
Retain::Retained,
&[REPUBLISH_CORRELATION_DATA],
)
.map_err(|_| ())?;

*republished = true;
}

Ok(())
}
}