From a52e7d11be75ed6b389b45ffafa1fb058438f2d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 13 Nov 2023 10:15:19 +0100 Subject: [PATCH] heapless 0.8 changes --- src/net/data_stream.rs | 50 +++++++++++++++++++----------------------- src/net/mod.rs | 4 ++-- src/net/telemetry.rs | 7 +++--- 3 files changed, 28 insertions(+), 33 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 728bd1234..36b1014d9 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -24,7 +24,8 @@ //! of livestreamed data. use core::mem::MaybeUninit; use heapless::{ - pool::{Box, Init, Pool, Uninit}, + box_pool, + pool::boxed::{Box, BoxBlock}, spsc::{Consumer, Producer, Queue}, }; use num_enum::IntoPrimitive; @@ -53,13 +54,14 @@ const FRAME_SIZE: usize = 1500 - 40 - 8; // allocated frame buffer should fit in the queue. const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2; -// Static storage used for a heapless::Pool of frame buffers. -static mut FRAME_DATA: [u8; core::mem::size_of::() - * FRAME_SIZE - * FRAME_COUNT] = [0; core::mem::size_of::() * FRAME_SIZE * FRAME_COUNT]; - type Frame = [MaybeUninit; FRAME_SIZE]; +box_pool!(FRAME_POOL: Frame); + +// Static storage used for a heapless::Pool of frame buffers. +const BLOCK: BoxBlock = BoxBlock::new(); +static mut FRAME_DATA: [BoxBlock; FRAME_COUNT] = [BLOCK; FRAME_COUNT]; + /// Represents the destination for the UDP stream to send data to. /// /// # Miniconf @@ -129,34 +131,32 @@ pub fn setup_streaming( .unwrap(); let (producer, consumer) = queue.split(); - let frame_pool = cortex_m::singleton!(: Pool = Pool::new()).unwrap(); - // Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function. - let memory = unsafe { &mut FRAME_DATA }; - frame_pool.grow(memory); + let blocks = unsafe { &mut FRAME_DATA }; + for block in blocks { + FRAME_POOL.manage(block); + } - let generator = FrameGenerator::new(producer, frame_pool); + let generator = FrameGenerator::new(producer); - let stream = DataStream::new(stack, consumer, frame_pool); + let stream = DataStream::new(stack, consumer); (generator, stream) } #[derive(Debug)] struct StreamFrame { - buffer: Box, + buffer: Box, offset: usize, batches: u8, } impl StreamFrame { pub fn new( - buffer: Box, + mut buffer: Box, format_id: u8, sequence_number: u32, ) -> Self { - let mut buffer = buffer.init([MaybeUninit::uninit(); FRAME_SIZE]); - for (byte, buf) in MAGIC .to_le_bytes() .iter() @@ -197,20 +197,15 @@ impl StreamFrame { /// The data generator for a stream. pub struct FrameGenerator { queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - pool: &'static Pool, current_frame: Option, sequence_number: u32, format: u8, } impl FrameGenerator { - fn new( - queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - pool: &'static Pool, - ) -> Self { + fn new(queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>) -> Self { Self { queue, - pool, format: StreamFormat::Unknown.into(), current_frame: None, sequence_number: 0, @@ -242,7 +237,9 @@ impl FrameGenerator { self.sequence_number = self.sequence_number.wrapping_add(1); if self.current_frame.is_none() { - if let Some(buffer) = self.pool.alloc() { + if let Ok(buffer) = + FRAME_POOL.alloc([MaybeUninit::uninit(); FRAME_SIZE]) + { self.current_frame.replace(StreamFrame::new( buffer, self.format, @@ -276,7 +273,6 @@ pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - frame_pool: &'static Pool, remote: SocketAddr, } @@ -290,14 +286,12 @@ impl DataStream { fn new( stack: NetworkReference, consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, - frame_pool: &'static Pool, ) -> Self { Self { stack, socket: None, remote: StreamTarget::default().into(), queue: consumer, - frame_pool, } } @@ -350,7 +344,7 @@ impl DataStream { if self.open().is_ok() { // If we just successfully opened the socket, flush old data from queue. while let Some(frame) = self.queue.dequeue() { - self.frame_pool.free(frame.buffer); + drop(frame.buffer); } } } @@ -365,7 +359,7 @@ impl DataStream { ) }; self.stack.send(handle, data).ok(); - self.frame_pool.free(frame.buffer) + drop(frame.buffer); } } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 7e3b7b66b..1394251e6 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -205,9 +205,9 @@ where // `settings_path` has to be at least as large as `miniconf::mqtt_client::MAX_TOPIC_LENGTH`. let mut settings_path: String<128> = String::new(); match self.miniconf.handled_update(|path, old, new| { - settings_path = path.into(); + settings_path = path.try_into().or(Err("path too long"))?; *old = new.clone(); - Result::<(), &'static str>::Ok(()) + Result::<_, &'static str>::Ok(()) }) { Ok(true) => NetworkState::SettingsChanged(settings_path), _ => poll_result, diff --git a/src/net/telemetry.rs b/src/net/telemetry.rs index e5efd28ef..a1d6386ae 100644 --- a/src/net/telemetry.rs +++ b/src/net/telemetry.rs @@ -10,7 +10,7 @@ //! sampling frequency. Instead, the raw codes are stored and the telemetry is generated as //! required immediately before transmission. This ensures that any slower computation required //! for unit conversion can be off-loaded to lower priority tasks. -use heapless::{String, Vec}; +use heapless::String; use serde::Serialize; use super::NetworkReference; @@ -115,7 +115,8 @@ impl TelemetryClient { >, prefix: &str, ) -> Self { - let mut telemetry_topic: String<128> = String::from(prefix); + let mut telemetry_topic: String<128> = + String::try_from(prefix).unwrap(); telemetry_topic.push_str("/telemetry").unwrap(); Self { @@ -134,7 +135,7 @@ impl TelemetryClient { /// # Args /// * `telemetry` - The telemetry to report pub fn publish(&mut self, telemetry: &T) { - let telemetry: Vec = + let telemetry: serde_json_core::heapless::Vec = serde_json_core::to_vec(telemetry).unwrap(); self.mqtt .client()