Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
svenrademakers committed Dec 21, 2023
1 parent c72107b commit 9cccd19
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 74 deletions.
13 changes: 6 additions & 7 deletions src/app/bmc_application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ impl BmcApplication {
.write_timeout(database_write_timeout)
.build()
.await?;
let serial = SerialConnections::new()?;

let mut serial = SerialConnections::new()?;
serial.run().await?;

let node_drivers = NodeDrivers::new();

let instance = Self {
Expand Down Expand Up @@ -304,12 +307,8 @@ impl BmcApplication {
Ok(())
}

pub async fn start_serial_workers(&self) -> anyhow::Result<()> {
Ok(self.serial.run().await?)
}

pub async fn serial_read(&self, node: NodeId, encoding: Encoding) -> anyhow::Result<String> {
let bytes = self.serial.read(node).await?;
let bytes = self.serial.dump_buffer(node).await?;

let res = match encoding {
Encoding::Utf8 => String::from_utf8_lossy(&bytes).to_string(),
Expand All @@ -320,7 +319,7 @@ impl BmcApplication {
}

pub async fn serial_write(&self, node: NodeId, data: &[u8]) -> anyhow::Result<()> {
Ok(self.serial.write(node, data).await?)
Ok(self.serial.write(node, data.into()).await?)
}

pub async fn set_node_info(&self, info: SetNodeInfo) -> anyhow::Result<()> {
Expand Down
235 changes: 169 additions & 66 deletions src/hal/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,134 +12,235 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handlers for UART connections to/from nodes
use std::error::Error;
use std::fmt::Display;
use std::io::{Read, Write};
use std::sync::Arc;

use super::NodeId;
use anyhow::Result;
use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use circular_buffer::CircularBuffer;
use futures::{SinkExt, StreamExt};
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex;
use std::io::Write;
use std::sync::Arc;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, WeakSender};
use tokio::sync::{broadcast, Mutex};
use tokio_serial::{DataBits, Parity, SerialPortBuilderExt, StopBits};
use tokio_util::codec::{BytesCodec, Decoder};

use super::NodeId;

const OUTPUT_BUF_SIZE: usize = 16 * 1024;

type RingBuffer = CircularBuffer<OUTPUT_BUF_SIZE, u8>;

#[derive(Debug)]
pub struct SerialConnections {
handlers: Vec<Mutex<Handler>>,
handlers: Vec<Handler>,
writers: Option<Vec<WeakSender<BytesMut>>>,
}

impl SerialConnections {
pub fn new() -> Result<Self> {
let paths = ["/dev/ttyS2", "/dev/ttyS1", "/dev/ttyS4", "/dev/ttyS5"];

let handlers: Vec<Mutex<Handler>> = paths
let handlers: Vec<Handler> = paths
.iter()
.enumerate()
.map(|(i, path)| Mutex::new(Handler::new(i + 1, path)))
.map(|(i, path)| {
Handler::new(
i + 1,
path,
115200,
DataBits::Eight,
Parity::None,
StopBits::One,
)
})
.collect();

Ok(SerialConnections { handlers })
Ok(SerialConnections {
handlers: handlers.into(),

Check failure on line 59 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

useless conversion to the same type: `std::vec::Vec<hal::serial::Handler>`

error: useless conversion to the same type: `std::vec::Vec<hal::serial::Handler>` --> src/hal/serial.rs:59:23 | 59 | handlers: handlers.into(), | ^^^^^^^^^^^^^^^ help: consider removing `.into()`: `handlers` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `-D clippy::useless-conversion` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::useless_conversion)]`
writers: None,
})
}

pub async fn run(&self) -> Result<(), SerialError> {
for h in &self.handlers {
h.lock().await.start_reader()?;
pub async fn run(&mut self) -> Result<(), SerialError> {
let mut writers = Vec::new();
for h in &mut self.handlers {
let writer = h.run_handler()?;
writers.push(writer);
}

let _ = self.writers.insert(writers);
Ok(())
}

pub async fn open_channel(

Check warning on line 75 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

methods `open_channel` and `get_state_handlers` are never used

Check failure on line 75 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

methods `open_channel` and `get_state_handlers` are never used

error: methods `open_channel` and `get_state_handlers` are never used --> src/hal/serial.rs:75:18 | 39 | impl SerialConnections { | ---------------------- methods in this implementation ... 75 | pub async fn open_channel( | ^^^^^^^^^^^^ ... 102 | pub fn get_state_handlers(&self) -> Vec<HandlerState> { | ^^^^^^^^^^^^^^^^^^ | = note: `-D dead-code` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(dead_code)]`
&self,
node: NodeId,
) -> Result<(BytesMut, SerialChannel), SerialError> {
let handler = &self.handlers[node as usize];
let buffer = handler.read_whole_buffer().await?;
Ok((buffer, handler.open_channel()?))
}

pub async fn dump_buffer(&self, node: NodeId) -> Result<BytesMut, SerialError> {
self.handlers[node as usize].read_whole_buffer().await
}

pub async fn write(&self, node: NodeId, bytes: BytesMut) -> Result<(), SerialError> {
let writer = self
.writers
.as_ref()
.ok_or(SerialError::NotStarted)?
.get(node as usize)
.expect("should have at least 4 writers initialized")
.upgrade()
.ok_or(SerialError::Stopped)?;

writer.send(bytes).await?;
Ok(())
}

pub async fn read(&self, node: NodeId) -> Result<Bytes, SerialError> {
let idx = node as usize;
self.handlers[idx].lock().await.read().await
pub fn get_state_handlers(&self) -> Vec<HandlerState> {

Check warning on line 102 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

type `HandlerState` is more private than the item `serial::SerialConnections::get_state_handlers`

Check failure on line 102 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

type `hal::serial::HandlerState` is more private than the item `hal::serial::SerialConnections::get_state_handlers`

error: type `hal::serial::HandlerState` is more private than the item `hal::serial::SerialConnections::get_state_handlers` --> src/hal/serial.rs:102:5 | 102 | pub fn get_state_handlers(&self) -> Vec<HandlerState> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ method `hal::serial::SerialConnections::get_state_handlers` is reachable at visibility `pub(crate)` | note: but type `hal::serial::HandlerState` is only usable at visibility `pub(self)` --> src/hal/serial.rs:146:1 | 146 | enum HandlerState { | ^^^^^^^^^^^^^^^^^ = note: `-D private-interfaces` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(private_interfaces)]`
assert!(self.handlers.len() == 4);
self.handlers.iter().map(Handler::get_state).collect()
}
}

pub async fn write<B: Into<BytesMut>>(&self, node: NodeId, data: B) -> Result<(), SerialError> {
let idx = node as usize;
self.handlers[idx].lock().await.write(data.into()).await
pub struct SerialChannel {
inner: (broadcast::Receiver<BytesMut>, mpsc::Sender<BytesMut>),

Check warning on line 109 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

field `inner` is never read

Check failure on line 109 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

field `inner` is never read

error: field `inner` is never read --> src/hal/serial.rs:109:5 | 108 | pub struct SerialChannel { | ------------- field in this struct 109 | inner: (broadcast::Receiver<BytesMut>, mpsc::Sender<BytesMut>), | ^^^^^
}

impl AsyncRead for SerialChannel {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,

Check warning on line 115 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

unused variable: `cx`

Check failure on line 115 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `cx`

error: unused variable: `cx` --> src/hal/serial.rs:115:9 | 115 | cx: &mut std::task::Context<'_>, | ^^ help: if this is intentional, prefix it with an underscore: `_cx` | = note: `-D unused-variables` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_variables)]`
buf: &mut tokio::io::ReadBuf<'_>,

Check warning on line 116 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

unused variable: `buf`

Check failure on line 116 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `buf`

error: unused variable: `buf` --> src/hal/serial.rs:116:9 | 116 | buf: &mut tokio::io::ReadBuf<'_>, | ^^^ help: if this is intentional, prefix it with an underscore: `_buf`
) -> std::task::Poll<std::io::Result<()>> {
todo!()
}
}

impl AsyncWrite for SerialChannel {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,

Check warning on line 125 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

unused variable: `cx`

Check failure on line 125 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `cx`

error: unused variable: `cx` --> src/hal/serial.rs:125:9 | 125 | cx: &mut std::task::Context<'_>, | ^^ help: if this is intentional, prefix it with an underscore: `_cx`
buf: &[u8],

Check warning on line 126 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

unused variable: `buf`

Check failure on line 126 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `buf`

error: unused variable: `buf` --> src/hal/serial.rs:126:9 | 126 | buf: &[u8], | ^^^ help: if this is intentional, prefix it with an underscore: `_buf`
) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
todo!()
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,

Check warning on line 133 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

unused variable: `cx`

Check failure on line 133 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `cx`

error: unused variable: `cx` --> src/hal/serial.rs:133:9 | 133 | cx: &mut std::task::Context<'_>, | ^^ help: if this is intentional, prefix it with an underscore: `_cx`
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
todo!()
}

fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,

Check warning on line 140 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

unused variable: `cx`

Check failure on line 140 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `cx`

error: unused variable: `cx` --> src/hal/serial.rs:140:9 | 140 | cx: &mut std::task::Context<'_>, | ^^ help: if this is intentional, prefix it with an underscore: `_cx`
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
todo!()
}
}

enum HandlerState {

Check failure on line 146 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

enum `HandlerState` is never used

error: enum `HandlerState` is never used --> src/hal/serial.rs:146:6 | 146 | enum HandlerState { | ^^^^^^^^^^^^
initialized,

Check warning on line 147 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / cargo-test

variant `initialized` should have an upper camel case name

Check failure on line 147 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

variant `initialized` should have an upper camel case name

error: variant `initialized` should have an upper camel case name --> src/hal/serial.rs:147:5 | 147 | initialized, | ^^^^^^^^^^^ help: convert the identifier to upper camel case (notice the capitalization): `Initialized` | = note: `-D non-camel-case-types` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(non_camel_case_types)]`
Running,
Stopped,
}

#[derive(Debug)]
struct Handler {
node: usize,
baud_rate: u32,
data_bits: DataBits,
parity: Parity,
stop_bits: StopBits,
path: &'static str,
ring_buffer: Arc<Mutex<Box<RingBuffer>>>,
worker_context: Option<Sender<BytesMut>>,
worker_context: Option<(broadcast::Sender<BytesMut>, mpsc::Sender<BytesMut>)>,
}

impl Handler {
fn new(node: usize, path: &'static str) -> Self {
fn new(
node: usize,
path: &'static str,
baud_rate: u32,
data_bits: DataBits,
parity: Parity,
stop_bits: StopBits,
) -> Self {
Handler {
node,
path,
baud_rate,
data_bits,
parity,
stop_bits,
ring_buffer: Arc::new(Mutex::new(RingBuffer::boxed())),
worker_context: None,
}
}

async fn write<B: Into<BytesMut>>(&self, data: B) -> Result<(), SerialError> {
let Some(sender) = &self.worker_context else {
pub fn get_state(&self) -> HandlerState {

Check failure on line 185 in src/hal/serial.rs

View workflow job for this annotation

GitHub Actions / clippy

methods `get_state` and `open_channel` are never used

error: methods `get_state` and `open_channel` are never used --> src/hal/serial.rs:185:12 | 164 | impl Handler { | ------------ methods in this implementation ... 185 | pub fn get_state(&self) -> HandlerState { | ^^^^^^^^^ ... 193 | pub fn open_channel(&self) -> Result<SerialChannel, SerialError> { | ^^^^^^^^^^^^
match &self.worker_context {
Some((_, sender)) if sender.is_closed() => HandlerState::Stopped,
Some(_) => HandlerState::Running,
None => HandlerState::initialized,
}
}

pub fn open_channel(&self) -> Result<SerialChannel, SerialError> {
let Some((read_sender, write_sender)) = &self.worker_context else {
return Err(SerialError::NotStarted);
};

sender
.send(data.into())
.await
.map_err(|e| SerialError::InternalError(e.to_string()))
Ok(SerialChannel {
inner: (read_sender.subscribe(), write_sender.clone()),
})
}

async fn read(&self) -> Result<Bytes, SerialError> {
/// This function returns all the cached data.
/// Time complexity: O(N)
async fn read_whole_buffer(&self) -> Result<BytesMut, SerialError> {
if self.worker_context.is_none() {
return Err(SerialError::NotStarted);
};

let mut rb = self.ring_buffer.lock().await;
let mut buf = vec![0; rb.len()];

rb.read(&mut buf)
.map_err(|e| SerialError::InternalError(format!("failed to read: {}", e)))?;

Ok(buf.into())
let mut bytes = BytesMut::new();
bytes.extend_from_slice(rb.make_contiguous());
Ok(bytes)
}

fn start_reader(&mut self) -> Result<(), SerialError> {
fn run_handler(&mut self) -> Result<WeakSender<BytesMut>, SerialError> {
if self.worker_context.take().is_some() {
return Err(SerialError::AlreadyRunning);
};

let baud_rate = 115200;
let mut port = tokio_serial::new(self.path, baud_rate)
.data_bits(DataBits::Eight)
.parity(Parity::None)
.stop_bits(StopBits::One)
.open_native_async()
.map_err(|e| SerialError::InternalError(e.to_string()))?;
let mut port = tokio_serial::new(self.path, self.baud_rate)
.data_bits(self.data_bits)
.parity(self.parity)
.stop_bits(self.stop_bits)
.open_native_async()?;

// Disable exclusivity of the port to allow other applications to open it.
// Not a reason to abort if we can't.
if let Err(e) = port.set_exclusive(false) {
log::warn!("Unable to set exclusivity of port {}: {}", self.path, e);
}

let (sender, mut receiver) = channel::<BytesMut>(64);
self.worker_context = Some(sender);
let (read_sender, _) = broadcast::channel::<BytesMut>(8);
let (write_sender, mut write_receiver) = mpsc::channel::<BytesMut>(8);
self.worker_context = Some((read_sender.clone(), write_sender.clone()));

let node = self.node;
let buffer = self.ring_buffer.clone();
tokio::spawn(async move {
let (mut sink, mut stream) = BytesCodec::new().framed(port).split();
loop {
tokio::select! {
res = receiver.recv() => {
res = write_receiver.recv() => {
let Some(data) = res else {
log::error!("error sending data to uart");
break;
Expand All @@ -161,35 +262,37 @@ impl Handler {
};

// Implementation is actually infallible in the currently used v0.1.3
let Ok(_) = buffer.lock().await.write(&bytes) else {
if buffer.lock().await.write(&bytes).is_err() {
log::error!("Failed to write to buffer of node {}", node);
break;
};

if let Err(e) = read_sender.send(bytes) {
log::error!("broadcast error: {:#}", e);
break;
}
},
}
}
log::warn!("exiting serial worker");
});

Ok(())
Ok(write_sender.downgrade())
}
}

#[derive(Debug)]
#[derive(Error, Debug)]
pub enum SerialError {
#[error("serial worker not started")]
NotStarted,
#[error("already running")]
AlreadyRunning,
InternalError(String),
}

impl Error for SerialError {}

impl Display for SerialError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SerialError::NotStarted => write!(f, "serial worker not started"),
SerialError::AlreadyRunning => write!(f, "already running"),
SerialError::InternalError(e) => e.fmt(f),
}
}
#[error("Stopped")]
Stopped,
#[error(transparent)]
SendError(#[from] SendError<bytes::BytesMut>),
#[error(transparent)]
SerialError(#[from] tokio_serial::Error),
#[error(transparent)]
IoError(#[from] std::io::Error),
}
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ async fn main() -> anyhow::Result<()> {
let config = Config::try_from(config_path()).context("Error parsing config file")?;
let (tls, tls6) = load_tls_config(&config)?;
let bmc = Data::new(BmcApplication::new(config.store.write_timeout).await?);
bmc.start_serial_workers().await?;

run_event_listener(bmc.clone().into_inner())?;
let streaming_data_service = Data::new(StreamingDataService::new());
Expand Down

0 comments on commit 9cccd19

Please sign in to comment.