From 2445d8ae4446b6558feea420bbcd089e687cf37a Mon Sep 17 00:00:00 2001 From: Sven Rademakers Date: Thu, 21 Dec 2023 10:40:12 +0000 Subject: [PATCH] WIP --- src/api.rs | 26 +++ src/api/legacy.rs | 76 +----- src/app/bmc_application.rs | 33 +-- src/hal.rs | 5 +- src/hal/serial.rs | 195 ---------------- src/hal/stub/serial.rs | 64 ----- src/main.rs | 7 +- src/serial_service.rs | 167 +++++++++++++ src/serial_service/serial.rs | 74 ++++++ src/serial_service/serial_channel.rs | 55 +++++ src/serial_service/serial_handler.rs | 219 ++++++++++++++++++ .../transfer_context.rs | 4 +- 12 files changed, 553 insertions(+), 372 deletions(-) delete mode 100644 src/hal/serial.rs delete mode 100644 src/hal/stub/serial.rs create mode 100644 src/serial_service.rs create mode 100644 src/serial_service/serial.rs create mode 100644 src/serial_service/serial_channel.rs create mode 100644 src/serial_service/serial_handler.rs diff --git a/src/api.rs b/src/api.rs index f59b8a8..c9b6e05 100644 --- a/src/api.rs +++ b/src/api.rs @@ -13,3 +13,29 @@ // limitations under the License. pub mod into_legacy_response; pub mod legacy; +use self::into_legacy_response::{LegacyResponse, LegacyResult}; +use crate::hal::NodeId; +use actix_web::web; +use std::str::FromStr; + +pub fn get_node_param( + query: &web::Query>, +) -> LegacyResult { + let Some(node_str) = query.get("node") else { + return Err(LegacyResponse::bad_request("Missing `node` parameter")); + }; + + let Ok(node_num) = i32::from_str(node_str) else { + return Err(LegacyResponse::bad_request( + "Parameter `node` is not a number", + )); + }; + + let Ok(node) = node_num.try_into() else { + return Err(LegacyResponse::bad_request( + "Parameter `node` is out of range 0..3 of node IDs", + )); + }; + + Ok(node) +} diff --git a/src/api/legacy.rs b/src/api/legacy.rs index 5d9c6de..08e13f1 100644 --- a/src/api/legacy.rs +++ b/src/api/legacy.rs @@ -14,7 +14,7 @@ //! Routes for legacy API present in versions <= 2.0.0 of the firmware. use crate::api::into_legacy_response::LegacyResponse; use crate::api::into_legacy_response::{LegacyResult, Null}; -use crate::app::bmc_application::{BmcApplication, Encoding, UsbConfig}; +use crate::app::bmc_application::{BmcApplication, UsbConfig}; use crate::app::bmc_info::{ get_fs_stat, get_ipv4_address, get_mac_address, get_net_interfaces, get_storage_info, }; @@ -43,6 +43,8 @@ use std::time::Duration; use tokio::io::AsyncBufReadExt; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; + +use super::get_node_param; type Query = web::Query>; /// version 1: @@ -168,8 +170,6 @@ async fn api_entry(bmc: web::Data, query: Query) -> impl Respond ("reset", true) => reset_node(bmc, query).await.into(), ("sdcard", true) => format_sdcard().into(), ("sdcard", false) => get_sdcard_info(), - ("uart", true) => write_to_uart(bmc, query).await.into(), - ("uart", false) => read_from_uart(bmc, query).await.into(), ("usb", true) => set_usb_mode(bmc, query).await.into(), ("usb", false) => get_usb_mode(bmc).await.into(), ("info", false) => get_info().await.into(), @@ -343,26 +343,6 @@ async fn set_node_to_msd(bmc: &BmcApplication, query: Query) -> LegacyResult<()> Ok(()) } -fn get_node_param(query: &Query) -> LegacyResult { - let Some(node_str) = query.get("node") else { - return Err(LegacyResponse::bad_request("Missing `node` parameter")); - }; - - let Ok(node_num) = i32::from_str(node_str) else { - return Err(LegacyResponse::bad_request( - "Parameter `node` is not a number", - )); - }; - - let Ok(node) = node_num.try_into() else { - return Err(LegacyResponse::bad_request( - "Parameter `node` is out of range 0..3 of node IDs", - )); - }; - - Ok(node) -} - async fn read_os_release() -> std::io::Result> { let buffer = tokio::fs::read("/etc/os-release").await?; let mut lines = buffer.lines(); @@ -492,56 +472,6 @@ fn get_sdcard_info() -> LegacyResponse { } } -async fn write_to_uart(bmc: &BmcApplication, query: Query) -> LegacyResult<()> { - let node = get_node_param(&query)?; - let Some(cmd) = query.get("cmd") else { - return Err(LegacyResponse::bad_request("Missing `cmd` parameter")); - }; - let mut data = cmd.clone(); - - data.push_str("\r\n"); - - bmc.serial_write(node, data.as_bytes()) - .await - .context("write over UART") - .map_err(Into::into) -} - -async fn read_from_uart(bmc: &BmcApplication, query: Query) -> LegacyResult { - let node = get_node_param(&query)?; - let enc = get_encoding_param(&query)?; - let data = bmc.serial_read(node, enc).await?; - - Ok(LegacyResponse::UartData(data)) -} - -fn get_encoding_param(query: &Query) -> LegacyResult { - let Some(enc_str) = query.get("encoding") else { - return Ok(Encoding::Utf8); - }; - - match enc_str.as_str() { - "utf8" => Ok(Encoding::Utf8), - "utf16" | "utf16le" => Ok(Encoding::Utf16 { - little_endian: true, - }), - "utf16be" => Ok(Encoding::Utf16 { - little_endian: false, - }), - "utf32" | "utf32le" => Ok(Encoding::Utf32 { - little_endian: true, - }), - "utf32be" => Ok(Encoding::Utf32 { - little_endian: false, - }), - _ => { - let msg = "Invalid `encoding` parameter. Expected: utf8, utf16, utf16le, utf16be, \ - utf32, utf32le, utf32be."; - Err(LegacyResponse::bad_request(msg)) - } - } -} - /// switches the USB configuration. /// API values are mapped to the `UsbConfig` as followed: /// diff --git a/src/app/bmc_application.rs b/src/app/bmc_application.rs index b3b0c59..2466c73 100644 --- a/src/app/bmc_application.rs +++ b/src/app/bmc_application.rs @@ -14,12 +14,11 @@ use crate::api::legacy::SetNodeInfo; use crate::hal::helpers::bit_iterator; use crate::hal::PowerController; -use crate::hal::SerialConnections; use crate::hal::{NodeId, PinController, UsbMode, UsbRoute}; use crate::persistency::app_persistency::ApplicationPersistency; use crate::persistency::app_persistency::PersistencyBuilder; use crate::usb_boot::NodeDrivers; -use crate::utils::{get_timestamp_unix, string_from_utf16, string_from_utf32}; +use crate::utils::get_timestamp_unix; use anyhow::{ensure, Context}; use log::{debug, trace}; use serde::{Deserialize, Serialize}; @@ -49,13 +48,6 @@ pub enum UsbConfig { Flashing(NodeId, UsbRoute), } -/// Encodings used when reading from a serial port -pub enum Encoding { - Utf8, - Utf16 { little_endian: bool }, - Utf32 { little_endian: bool }, -} - #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct NodeInfos { pub data: Vec, @@ -90,7 +82,6 @@ pub struct BmcApplication { pub(super) pin_controller: PinController, pub(super) power_controller: PowerController, pub(super) app_db: ApplicationPersistency, - serial: SerialConnections, node_drivers: NodeDrivers, } @@ -105,14 +96,13 @@ impl BmcApplication { .write_timeout(database_write_timeout) .build() .await?; - let serial = SerialConnections::new()?; + let node_drivers = NodeDrivers::new(); let instance = Self { pin_controller, power_controller, app_db, - serial, node_drivers, }; @@ -304,25 +294,6 @@ impl BmcApplication { Ok(()) } - pub async fn start_serial_workers(&self) -> anyhow::Result<()> { - Ok(self.serial.run().await?) - } - - pub async fn serial_read(&self, node: NodeId, encoding: Encoding) -> anyhow::Result { - let bytes = self.serial.read(node).await?; - - let res = match encoding { - Encoding::Utf8 => String::from_utf8_lossy(&bytes).to_string(), - Encoding::Utf16 { little_endian } => string_from_utf16(&bytes, little_endian), - Encoding::Utf32 { little_endian } => string_from_utf32(&bytes, little_endian), - }; - Ok(res) - } - - pub async fn serial_write(&self, node: NodeId, data: &[u8]) -> anyhow::Result<()> { - Ok(self.serial.write(node, data).await?) - } - pub async fn set_node_info(&self, info: SetNodeInfo) -> anyhow::Result<()> { ensure!(info.node >= 1 && info.node <= 4); diff --git a/src/hal.rs b/src/hal.rs index 77b6734..b4f111f 100644 --- a/src/hal.rs +++ b/src/hal.rs @@ -1,5 +1,3 @@ -use std::fmt::Display; - // Copyright 2023 Turing Machines // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,6 +12,7 @@ use std::fmt::Display; // See the License for the specific language governing permissions and // limitations under the License. pub mod helpers; +use std::fmt::Display; macro_rules! conditional_import { ($attribute_condition:meta, $($statement:item)+) => { @@ -29,10 +28,8 @@ conditional_import! { mod gpio_definitions; mod pin_controller; mod power_controller; - mod serial; pub use pin_controller::*; pub use power_controller::*; - pub use serial::*; } conditional_import! { diff --git a/src/hal/serial.rs b/src/hal/serial.rs deleted file mode 100644 index f899229..0000000 --- a/src/hal/serial.rs +++ /dev/null @@ -1,195 +0,0 @@ -// Copyright 2023 Turing Machines -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//! Handlers for UART connections to/from nodes -use std::error::Error; -use std::fmt::Display; -use std::io::{Read, Write}; -use std::sync::Arc; - -use anyhow::Result; -use bytes::{Bytes, BytesMut}; -use circular_buffer::CircularBuffer; -use futures::{SinkExt, StreamExt}; -use tokio::sync::mpsc::{channel, Sender}; -use tokio::sync::Mutex; -use tokio_serial::{DataBits, Parity, SerialPortBuilderExt, StopBits}; -use tokio_util::codec::{BytesCodec, Decoder}; - -use super::NodeId; - -const OUTPUT_BUF_SIZE: usize = 16 * 1024; - -type RingBuffer = CircularBuffer; - -#[derive(Debug)] -pub struct SerialConnections { - handlers: Vec>, -} - -impl SerialConnections { - pub fn new() -> Result { - let paths = ["/dev/ttyS2", "/dev/ttyS1", "/dev/ttyS4", "/dev/ttyS5"]; - - let handlers: Vec> = paths - .iter() - .enumerate() - .map(|(i, path)| Mutex::new(Handler::new(i + 1, path))) - .collect(); - - Ok(SerialConnections { handlers }) - } - - pub async fn run(&self) -> Result<(), SerialError> { - for h in &self.handlers { - h.lock().await.start_reader()?; - } - Ok(()) - } - - pub async fn read(&self, node: NodeId) -> Result { - let idx = node as usize; - self.handlers[idx].lock().await.read().await - } - - pub async fn write>(&self, node: NodeId, data: B) -> Result<(), SerialError> { - let idx = node as usize; - self.handlers[idx].lock().await.write(data.into()).await - } -} - -#[derive(Debug)] -struct Handler { - node: usize, - path: &'static str, - ring_buffer: Arc>>, - worker_context: Option>, -} - -impl Handler { - fn new(node: usize, path: &'static str) -> Self { - Handler { - node, - path, - ring_buffer: Arc::new(Mutex::new(RingBuffer::boxed())), - worker_context: None, - } - } - - async fn write>(&self, data: B) -> Result<(), SerialError> { - let Some(sender) = &self.worker_context else { - return Err(SerialError::NotStarted); - }; - - sender - .send(data.into()) - .await - .map_err(|e| SerialError::InternalError(e.to_string())) - } - - async fn read(&self) -> Result { - if self.worker_context.is_none() { - return Err(SerialError::NotStarted); - }; - - let mut rb = self.ring_buffer.lock().await; - let mut buf = vec![0; rb.len()]; - - rb.read(&mut buf) - .map_err(|e| SerialError::InternalError(format!("failed to read: {}", e)))?; - - Ok(buf.into()) - } - - fn start_reader(&mut self) -> Result<(), SerialError> { - if self.worker_context.take().is_some() { - return Err(SerialError::AlreadyRunning); - }; - - let baud_rate = 115200; - let mut port = tokio_serial::new(self.path, baud_rate) - .data_bits(DataBits::Eight) - .parity(Parity::None) - .stop_bits(StopBits::One) - .open_native_async() - .map_err(|e| SerialError::InternalError(e.to_string()))?; - - // Disable exclusivity of the port to allow other applications to open it. - // Not a reason to abort if we can't. - if let Err(e) = port.set_exclusive(false) { - log::warn!("Unable to set exclusivity of port {}: {}", self.path, e); - } - - let (sender, mut receiver) = channel::(64); - self.worker_context = Some(sender); - - let node = self.node; - let buffer = self.ring_buffer.clone(); - tokio::spawn(async move { - let (mut sink, mut stream) = BytesCodec::new().framed(port).split(); - loop { - tokio::select! { - res = receiver.recv() => { - let Some(data) = res else { - log::error!("error sending data to uart"); - break; - }; - - if let Err(e) = sink.send(data).await { - log::error!("{}", e); - } - }, - res = stream.next() => { - let Some(res) = res else { - log::error!("Error reading serial stream of node {}", node); - break; - }; - - let Ok(bytes) = res else { - log::error!("Serial stream of node {} has closed", node); - break; - }; - - // Implementation is actually infallible in the currently used v0.1.3 - let Ok(_) = buffer.lock().await.write(&bytes) else { - log::error!("Failed to write to buffer of node {}", node); - break; - }; - }, - } - } - log::warn!("exiting serial worker"); - }); - - Ok(()) - } -} - -#[derive(Debug)] -pub enum SerialError { - NotStarted, - AlreadyRunning, - InternalError(String), -} - -impl Error for SerialError {} - -impl Display for SerialError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - SerialError::NotStarted => write!(f, "serial worker not started"), - SerialError::AlreadyRunning => write!(f, "already running"), - SerialError::InternalError(e) => e.fmt(f), - } - } -} diff --git a/src/hal/stub/serial.rs b/src/hal/stub/serial.rs deleted file mode 100644 index 0a4300f..0000000 --- a/src/hal/stub/serial.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2023 Turing Machines -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//! Handlers for UART connections to/from nodes -use crate::hal::NodeId; -use anyhow::Result; -use bytes::{Bytes, BytesMut}; -use std::error::Error; -use std::fmt::Display; -#[derive(Debug)] -pub struct SerialConnections; - -impl SerialConnections { - pub fn new() -> Result { - Ok(SerialConnections) - } - - pub async fn run(&self) -> Result<(), SerialError> { - Ok(()) - } - - pub async fn read(&self, _: NodeId) -> Result { - let data: &'static [u8] = b"this is a stub implementation"; - Ok(data.into()) - } - - pub async fn write + std::fmt::Debug>( - &self, - node: NodeId, - data: B, - ) -> Result<(), SerialError> { - log::warn!("writing {}: {:?}", node, data); - Ok(()) - } -} - -#[derive(Debug)] -pub enum SerialError { - NotStarted, - AlreadyRunning, - InternalError(String), -} - -impl Error for SerialError {} - -impl Display for SerialError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - SerialError::NotStarted => write!(f, "serial worker not started"), - SerialError::AlreadyRunning => write!(f, "already running"), - SerialError::InternalError(e) => e.fmt(f), - } - } -} diff --git a/src/main.rs b/src/main.rs index 095edd6..367fc8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,11 +18,13 @@ mod authentication; mod config; mod hal; mod persistency; +mod serial_service; mod streaming_data_service; mod usb_boot; mod utils; use crate::config::Config; +use crate::serial_service::serial_config; use crate::{ api::legacy, api::legacy::info_config, authentication::linux_authenticator::LinuxAuthenticator, streaming_data_service::StreamingDataService, @@ -59,9 +61,8 @@ async fn main() -> anyhow::Result<()> { let config = Config::try_from(config_path()).context("Error parsing config file")?; let (tls, tls6) = load_tls_config(&config)?; let bmc = Data::new(BmcApplication::new(config.store.write_timeout).await?); - bmc.start_serial_workers().await?; - run_event_listener(bmc.clone().into_inner())?; + let streaming_data_service = Data::new(StreamingDataService::new()); let authentication = Arc::new( LinuxAuthenticator::new( @@ -78,6 +79,7 @@ async fn main() -> anyhow::Result<()> { .app_data(bmc.clone()) .app_data(streaming_data_service.clone()) .wrap(authentication.clone()) + .configure(serial_config) // Legacy API .configure(legacy::config) // Serve a static tree of files of the web UI. Must be the last item. @@ -107,7 +109,6 @@ async fn main() -> anyhow::Result<()> { // run server(s) join_all(futures).await; - log::info!("exiting {}", env!("CARGO_PKG_NAME")); Ok(()) } diff --git a/src/serial_service.rs b/src/serial_service.rs new file mode 100644 index 0000000..0fe119a --- /dev/null +++ b/src/serial_service.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Turing Machines +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use self::serial::SerialConnections; +use crate::{ + api::{ + get_node_param, + into_legacy_response::{LegacyResponse, LegacyResult}, + }, + hal::NodeId, + utils::{string_from_utf16, string_from_utf32}, +}; +use actix_web::{ + guard::{fn_guard, GuardContext}, + post, + web::{self, Data}, + Responder, +}; +type Query = web::Query>; + +pub mod serial; +pub mod serial_channel; +pub mod serial_handler; + +pub fn serial_config(cfg: &mut web::ServiceConfig) { + let uart = Data::new(SerialService::new()); + cfg.app_data(uart) + .service( + web::resource("/api/bmc") + .route( + web::get() + .guard(fn_guard(legacy_set_uart)) + .to(serial_set_handler), + ) + .route( + web::get() + .guard(fn_guard(legacy_get_uart)) + .to(serial_get_handler), + ), + ) + .service(serial_status); +} + +fn legacy_set_uart(context: &GuardContext<'_>) -> bool { + let Some(query) = context.head().uri.query() else { + return false; + }; + query.contains("opt=set") && query.contains("type=uart") +} + +fn legacy_get_uart(context: &GuardContext<'_>) -> bool { + let Some(query) = context.head().uri.query() else { + return false; + }; + query.contains("opt=get") && query.contains("type=uart") +} + +#[post("/api/bmc/serial/status")] +async fn serial_status(serial_service: web::Data) -> impl Responder { + serial_service + .status() + .map_or_else(|e| e.to_string(), |s| s) +} + +async fn serial_set_handler(uart: web::Data, query: Query) -> impl Responder { + let node = match get_node_param(&query) { + Ok(n) => n, + Err(e) => return e, + }; + + let Some(cmd) = query.get("cmd") else { + return LegacyResponse::bad_request("Missing `cmd` parameter"); + }; + + let mut data = cmd.clone(); + data.push_str("\r\n"); + uart.write_node(node, data.as_bytes().into()).await.into() +} + +async fn serial_get_handler(uart: web::Data, query: Query) -> impl Responder { + let legacy_get = async { + let node = get_node_param(&query)?; + let encoding = get_encoding_param(&query)?; + let data = uart.read_as_string(node, encoding).await?; + Ok::(data) + }; + + legacy_get + .await + .map_or_else(|e| e, LegacyResponse::UartData) +} + +fn get_encoding_param(query: &Query) -> LegacyResult { + let Some(enc_str) = query.get("encoding") else { + return Ok(Encoding::Utf8); + }; + + match enc_str.as_str() { + "utf8" => Ok(Encoding::Utf8), + "utf16" | "utf16le" => Ok(Encoding::Utf16 { + little_endian: true, + }), + "utf16be" => Ok(Encoding::Utf16 { + little_endian: false, + }), + "utf32" | "utf32le" => Ok(Encoding::Utf32 { + little_endian: true, + }), + "utf32be" => Ok(Encoding::Utf32 { + little_endian: false, + }), + _ => { + let msg = "Invalid `encoding` parameter. Expected: utf8, utf16, utf16le, utf16be, \ + utf32, utf32le, utf32be."; + Err(LegacyResponse::bad_request(msg)) + } + } +} + +/// Encodings used when reading from a serial port +pub enum Encoding { + Utf8, + Utf16 { little_endian: bool }, + Utf32 { little_endian: bool }, +} + +pub struct SerialService { + serial_connections: SerialConnections, +} + +impl SerialService { + pub async fn new() -> Self { + let mut serial_connections = SerialConnections::new(); + serial_connections.run().await.expect("Serial run error"); + Self { serial_connections } + } + + pub async fn read_as_string(&self, node: NodeId, encoding: Encoding) -> anyhow::Result { + let handler = &self.serial_connections[node as usize]; + let bytes = handler.read_whole_buffer().await?; + let res = match encoding { + Encoding::Utf8 => String::from_utf8_lossy(&bytes).to_string(), + Encoding::Utf16 { little_endian } => string_from_utf16(&bytes, little_endian), + Encoding::Utf32 { little_endian } => string_from_utf32(&bytes, little_endian), + }; + Ok(res) + } + + pub async fn write_node(&self, node: NodeId, bytes: bytes::BytesMut) -> anyhow::Result<()> { + let handler = &self.serial_connections[node as usize]; + Ok(handler.write(bytes).await?) + } + + pub fn status(&self) -> anyhow::Result { + Ok(serde_json::to_string(&self.serial_connections.get_state())?) + } +} diff --git a/src/serial_service/serial.rs b/src/serial_service/serial.rs new file mode 100644 index 0000000..7c8a416 --- /dev/null +++ b/src/serial_service/serial.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Turing Machines +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//! Handlers for UART connections to/from nodes +use std::ops::Index; + +use super::serial_handler::Handler; +use crate::serial_service::serial_handler::HandlerState; +use crate::serial_service::serial_handler::SerialError; +use anyhow::Result; +use tokio_serial::{DataBits, Parity, StopBits}; + +/// Collection of [`crate::serial_service::serial_handler::Handler`] +#[derive(Debug)] +pub struct SerialConnections { + handlers: Vec, +} + +impl SerialConnections { + pub fn new() -> Self { + let paths = ["/dev/ttyS2", "/dev/ttyS1", "/dev/ttyS4", "/dev/ttyS5"]; + + let handlers: Vec = paths + .iter() + .enumerate() + .map(|(i, path)| { + Handler::new( + i + 1, + path, + 115200, + DataBits::Eight, + Parity::None, + StopBits::One, + ) + }) + .collect(); + + SerialConnections { handlers } + } + + pub async fn run(&mut self) -> Result<(), SerialError> { + for h in &mut self.handlers { + h.run()?; + } + + Ok(()) + } + + pub fn get_state(&self) -> Vec { + self.handlers.iter().map(Handler::get_state).collect() + } +} + +impl Index for SerialConnections { + type Output = Handler; + + fn index(&self, index: usize) -> &Self::Output { + assert!( + self.handlers.len() == 4, + "Serial connections not initialized" + ); + &self.handlers[index] + } +} diff --git a/src/serial_service/serial_channel.rs b/src/serial_service/serial_channel.rs new file mode 100644 index 0000000..c15b80e --- /dev/null +++ b/src/serial_service/serial_channel.rs @@ -0,0 +1,55 @@ +#![allow(dead_code, unused_variables)] +use bytes::BytesMut; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{ + broadcast, + mpsc::{self}, + }, +}; + +pub struct SerialChannel { + inner: (broadcast::Receiver, mpsc::Sender), +} + +impl SerialChannel { + pub fn new(receiver: broadcast::Receiver, sender: mpsc::Sender) -> Self { + Self { + inner: (receiver, sender), + } + } +} + +impl AsyncRead for SerialChannel { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + todo!() + } +} + +impl AsyncWrite for SerialChannel { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + todo!() + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + todo!() + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + todo!() + } +} diff --git a/src/serial_service/serial_handler.rs b/src/serial_service/serial_handler.rs new file mode 100644 index 0000000..294a71c --- /dev/null +++ b/src/serial_service/serial_handler.rs @@ -0,0 +1,219 @@ +// Copyright 2023 Turing Machines +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use bytes::BytesMut; +use circular_buffer::CircularBuffer; +use futures::{SinkExt, StreamExt}; +use serde::Serialize; +use std::io::Write; +use std::sync::Arc; +use thiserror::Error; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{ + broadcast, + mpsc::{self, error::SendError, WeakSender}, + Mutex, + }, +}; +use tokio_serial::{DataBits, Parity, SerialPortBuilderExt, StopBits}; +use tokio_util::codec::{BytesCodec, Decoder}; + +use super::serial_channel::SerialChannel; +type RingBuffer = CircularBuffer; + +const OUTPUT_BUF_SIZE: usize = 16 * 1024; + +/// A [`Handler`] is an object that controls a single UART connection from the +/// BMC to a predefined node. It has 2 functions to directly read and write the +/// serial, see [`Self::write`] and [`Self::read_whole_buffer`]. However its +/// recommended to open a channel using the [`Self::open_channel`] method. A +/// [`Handler`] can handle an arbitrary amount of channels which read/write to +/// the UART port. +#[derive(Debug)] +pub struct Handler { + node: usize, + baud_rate: u32, + data_bits: DataBits, + parity: Parity, + stop_bits: StopBits, + path: &'static str, + ring_buffer: Arc>>, + worker_context: Option<(broadcast::Sender, mpsc::Sender)>, + writer: Option>, +} + +impl Handler { + pub fn new( + node: usize, + path: &'static str, + baud_rate: u32, + data_bits: DataBits, + parity: Parity, + stop_bits: StopBits, + ) -> Self { + Handler { + node, + path, + baud_rate, + data_bits, + parity, + stop_bits, + ring_buffer: Arc::new(Mutex::new(RingBuffer::boxed())), + worker_context: None, + writer: None, + } + } + + /// Returns the current state of the Handler. + pub fn get_state(&self) -> HandlerState { + match &self.worker_context { + Some((_, sender)) if sender.is_closed() => HandlerState::Stopped, + Some(_) => HandlerState::Running, + None => HandlerState::Initialized, + } + } + + /// Opens a stream which can be used to read and write data with. + /// + /// # Returns + /// + /// * `SerialError::NotStarted` when [`Self::run`] was not called + /// successfully + pub fn open_channel(&self) -> Result { + let Some((read_sender, write_sender)) = &self.worker_context else { + return Err(SerialError::NotStarted); + }; + + Ok(SerialChannel::new( + read_sender.subscribe(), + write_sender.clone(), + )) + } + + /// This function returns all the cached data. + /// Time complexity: O(N) + pub async fn read_whole_buffer(&self) -> Result { + if self.worker_context.is_none() { + return Err(SerialError::NotStarted); + }; + + let mut rb = self.ring_buffer.lock().await; + let mut bytes = BytesMut::new(); + bytes.extend_from_slice(rb.make_contiguous()); + Ok(bytes) + } + + pub async fn write(&self, bytes: BytesMut) -> Result<(), SerialError> { + let writer = self + .writer + .as_ref() + .ok_or(SerialError::NotStarted)? + .upgrade() + .ok_or(SerialError::Stopped)?; + + writer.send(bytes).await?; + Ok(()) + } + + pub fn run(&mut self) -> Result<(), SerialError> { + if self.worker_context.take().is_some() { + return Err(SerialError::AlreadyRunning); + }; + + let mut port = tokio_serial::new(self.path, self.baud_rate) + .data_bits(self.data_bits) + .parity(self.parity) + .stop_bits(self.stop_bits) + .open_native_async()?; + + // Disable exclusivity of the port to allow other applications to open it. + // Not a reason to abort if we can't. + if let Err(e) = port.set_exclusive(false) { + log::warn!("Unable to set exclusivity of port {}: {}", self.path, e); + } + + let (read_sender, _) = broadcast::channel::(8); + let (write_sender, mut write_receiver) = mpsc::channel::(8); + self.worker_context = Some((read_sender.clone(), write_sender.clone())); + + let node = self.node; + let buffer = self.ring_buffer.clone(); + tokio::spawn(async move { + let (mut sink, mut stream) = BytesCodec::new().framed(port).split(); + loop { + tokio::select! { + res = write_receiver.recv() => { + let Some(data) = res else { + log::error!("error sending data to uart"); + break; + }; + + if let Err(e) = sink.send(data).await { + log::error!("{}", e); + } + }, + res = stream.next() => { + let Some(res) = res else { + log::error!("Error reading serial stream of node {}", node); + break; + }; + + let Ok(bytes) = res else { + log::error!("Serial stream of node {} has closed", node); + break; + }; + + // Implementation is actually infallible in the currently used v0.1.3 + if buffer.lock().await.write(&bytes).is_err() { + log::error!("Failed to write to buffer of node {}", node); + break; + }; + + if let Err(e) = read_sender.send(bytes) { + log::error!("broadcast error: {:#}", e); + break; + } + }, + } + } + log::warn!("exiting serial worker"); + }); + + self.writer = Some(write_sender.downgrade()); + Ok(()) + } +} + +#[derive(Error, Debug)] +pub enum SerialError { + #[error("serial worker not started")] + NotStarted, + #[error("already running")] + AlreadyRunning, + #[error("Stopped")] + Stopped, + #[error(transparent)] + SendError(#[from] SendError), + #[error(transparent)] + SerialError(#[from] tokio_serial::Error), + #[error(transparent)] + IoError(#[from] std::io::Error), +} + +#[derive(Serialize)] +pub enum HandlerState { + Initialized, + Running, + Stopped, +} diff --git a/src/streaming_data_service/transfer_context.rs b/src/streaming_data_service/transfer_context.rs index d58b0d8..1f07bb2 100644 --- a/src/streaming_data_service/transfer_context.rs +++ b/src/streaming_data_service/transfer_context.rs @@ -18,9 +18,9 @@ use tokio::sync::{mpsc, watch}; use tokio_util::sync::CancellationToken; /// Context object for node flashing. This object acts as a "cancel-guard" for -/// the [`StreamingDataService`]. If [`TransferContext`] gets dropped, it will +/// the [`crate::StreamingDataService`]. If [`TransferContext`] gets dropped, it will /// cancel its "cancel" token, effectively aborting the node flash task. This -/// typically happens on a state transition inside the [`StreamingDataService`]. +/// typically happens on a state transition inside the [`crate::StreamingDataService`]. #[derive(Serialize)] pub struct TransferContext { pub id: u32,