Skip to content

Commit

Permalink
Fix subscribe return code in v4 and return requested qos in broker
Browse files Browse the repository at this point in the history
  • Loading branch information
tekjar committed May 13, 2021
1 parent 407a08c commit 3789131
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 42 deletions.
57 changes: 20 additions & 37 deletions mqttbytes/src/v4/suback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ pub struct SubAck {

impl SubAck {
pub fn new(pkid: u16, return_codes: Vec<SubscribeReasonCode>) -> SubAck {
SubAck {
pkid,
return_codes,
}
SubAck { pkid, return_codes }
}

pub fn len(&self) -> usize {
Expand All @@ -39,10 +36,7 @@ impl SubAck {
return_codes.push(return_code.try_into()?);
}

let suback = SubAck {
pkid,
return_codes,
};
let suback = SubAck { pkid, return_codes };

Ok(suback)
}
Expand All @@ -53,47 +47,34 @@ impl SubAck {
let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;

buffer.put_u16(self.pkid);
let p: Vec<u8> = self.return_codes.iter().map(|code| *code as u8).collect();
let p: Vec<u8> = self
.return_codes
.iter()
.map(|&code| match code {
SubscribeReasonCode::Success(qos) => qos as u8,
SubscribeReasonCode::Failure => 0x80,
})
.collect();
buffer.extend_from_slice(&p);
Ok(1 + remaining_len_bytes + remaining_len)
}
}


#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubscribeReasonCode {
QoS0 = 0,
QoS1 = 1,
QoS2 = 2,
Unspecified = 128,
ImplementationSpecific = 131,
NotAuthorized = 135,
TopicFilterInvalid = 143,
PkidInUse = 145,
QuotaExceeded = 151,
SharedSubscriptionsNotSupported = 158,
SubscriptionIdNotSupported = 161,
WildcardSubscriptionsNotSupported = 162,
Success(QoS),
Failure,
}

impl TryFrom<u8> for SubscribeReasonCode {
type Error = crate::Error;

fn try_from(value: u8) -> Result<Self, Self::Error> {
let v = match value {
0 => SubscribeReasonCode::QoS0,
1 => SubscribeReasonCode::QoS1,
2 => SubscribeReasonCode::QoS2,
128 => SubscribeReasonCode::Unspecified,
131 => SubscribeReasonCode::ImplementationSpecific,
135 => SubscribeReasonCode::NotAuthorized,
143 => SubscribeReasonCode::TopicFilterInvalid,
145 => SubscribeReasonCode::PkidInUse,
151 => SubscribeReasonCode::QuotaExceeded,
158 => SubscribeReasonCode::SharedSubscriptionsNotSupported,
161 => SubscribeReasonCode::SubscriptionIdNotSupported,
162 => SubscribeReasonCode::WildcardSubscriptionsNotSupported,
0 => SubscribeReasonCode::Success(QoS::AtMostOnce),
1 => SubscribeReasonCode::Success(QoS::AtLeastOnce),
2 => SubscribeReasonCode::Success(QoS::ExactlyOnce),
128 => SubscribeReasonCode::Failure,
v => return Err(crate::Error::InvalidSubscribeReasonCode(v)),
};

Expand Down Expand Up @@ -126,9 +107,11 @@ mod test {
packet,
SubAck {
pkid: 15,
return_codes: vec![SubscribeReasonCode::QoS1, SubscribeReasonCode::Unspecified,],
return_codes: vec![
SubscribeReasonCode::Success(QoS::AtLeastOnce),
SubscribeReasonCode::Failure,
],
}
);
}
}

11 changes: 6 additions & 5 deletions rumqttlog/src/router/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use thiserror::Error;
use super::connection::ConnectionType;
use super::readyqueue::ReadyQueue;
use super::slab::Slab;
use crate::logs::acks::Acks;
use super::*;
use crate::logs::acks::Acks;

use crate::logs::{ConnectionsLog, DataLog, TopicsLog};
use crate::router::metrics::RouterMetrics;
Expand Down Expand Up @@ -388,10 +388,9 @@ impl Router {
let mut return_codes = Vec::new();
for filter in subscribe.filters.iter() {
if filter.path.starts_with("test") || filter.path.starts_with("$") {
return_codes.push(SubscribeReasonCode::TopicFilterInvalid);
return_codes.push(SubscribeReasonCode::Failure);
} else {
// TODO: Fix subscribe return code
return_codes.push(SubscribeReasonCode::QoS0);
return_codes.push(SubscribeReasonCode::Success(filter.qos));
}
}

Expand Down Expand Up @@ -792,7 +791,9 @@ mod test {
let client_id = &format!("{}", i);
add_new_remote_connection(&mut router, client_id);
add_new_subscription(&mut router, i, "hello/world");
router.data_waiters.register(i, DataRequest::new("hello/world".to_owned(), 1));
router
.data_waiters
.register(i, DataRequest::new("hello/world".to_owned(), 1));
}

let payload = Bytes::from(vec![1, 2, 3]);
Expand Down

0 comments on commit 3789131

Please sign in to comment.