From 26d4be642fc77e587bec61af839d90338d96fdea Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Thu, 1 Feb 2024 08:40:05 +0100 Subject: [PATCH 01/19] add support of a unix-like poll function poll() examines a set of file descriptors to see if some of them are ready for I/O or if certain events have occurred on them. The currenty implementation is using an asynchronous function to check all file descriptors. --- Cargo.lock | 12 +++ Cargo.toml | 1 + src/executor/mod.rs | 182 ++++++++++++++++++++++++++++++++++++++++ src/executor/network.rs | 147 +------------------------------- src/fd/mod.rs | 83 ++++++++++++++++++ src/fd/socket/tcp.rs | 11 +-- src/fd/socket/udp.rs | 16 ++-- src/fd/stdio.rs | 56 ++++++++++++- src/fs/mem.rs | 43 +++++++++- src/syscalls/mod.rs | 12 +++ 10 files changed, 406 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0e25e276e..656e37f7a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,17 @@ dependencies = [ "bitflags 2.4.2", ] +[[package]] +name = "async-trait" +version = "0.1.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "531b97fb4cd3dfdce92c35dedbfdc1f0b9d8091c8ca943d6dae340ef5012d514" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.41", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -493,6 +504,7 @@ dependencies = [ "align-address", "anyhow", "arm-gic", + "async-trait", "bit_field", "bitflags 2.4.2", "build-time", diff --git a/Cargo.toml b/Cargo.toml index efa9e8796c..1379e17d70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,6 +94,7 @@ talc = { version = "4" } time = { version = "0.3", default-features = false } zerocopy = { version = "0.7", features = ["derive"] } build-time = "0.1.3" +async-trait = "0.1.48" [dependencies.smoltcp] version = "0.11" diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 9d786c0ace..0d66c021d4 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -11,11 +11,33 @@ use alloc::task::Wake; use core::future::Future; use core::sync::atomic::AtomicU32; use core::task::{Context, Poll, Waker}; +use core::time::Duration; +use crossbeam_utils::Backoff; use hermit_sync::without_interrupts; +#[cfg(any(feature = "tcp", feature = "udp"))] +use smoltcp::time::Instant; use crate::arch::core_local::*; +#[cfg(all( + any(feature = "tcp", feature = "udp"), + not(feature = "pci"), + not(feature = "newlib") +))] +use crate::drivers::mmio::get_network_driver; +#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))] +use crate::drivers::net::NetworkDriver; +#[cfg(all( + any(feature = "tcp", feature = "udp"), + feature = "pci", + not(feature = "newlib") +))] +use crate::drivers::pci::get_network_driver; +#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))] +use crate::executor::network::network_delay; use crate::executor::task::AsyncTask; +use crate::fd::IoError; +use crate::scheduler::PerCoreSchedulerExt; use crate::synch::futex::*; struct TaskNotify { @@ -77,3 +99,163 @@ pub fn init() { #[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))] crate::executor::network::init(); } + +#[inline] +pub(crate) fn now() -> u64 { + crate::arch::kernel::systemtime::now_micros() + .try_into() + .unwrap() +} + +/// Blocks the current thread on `f`, running the executor when idling. +pub(crate) fn poll_on(future: F, timeout: Option) -> Result +where + F: Future>, +{ + // disable network interrupts + #[cfg(any(feature = "tcp", feature = "udp"))] + let no_retransmission = { + let mut guard = get_network_driver().unwrap().lock(); + guard.set_polling_mode(true); + guard.get_checksums().tcp.tx() + }; + + let start = now(); + let waker = core::task::Waker::noop(); + let mut cx = Context::from_waker(&waker); + let mut future = future; + let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; + + loop { + // run background tasks + run(); + + if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { + #[cfg(any(feature = "tcp", feature = "udp"))] + if !no_retransmission { + let wakeup_time = + network_delay(Instant::from_micros_const(now().try_into().unwrap())) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(wakeup_time); + } + + // allow network interrupts + #[cfg(any(feature = "tcp", feature = "udp"))] + get_network_driver().unwrap().lock().set_polling_mode(false); + + return t; + } + + if let Some(duration) = timeout { + if Duration::from_micros(now() - start) >= duration { + #[cfg(any(feature = "tcp", feature = "udp"))] + if !no_retransmission { + let wakeup_time = + network_delay(Instant::from_micros_const(now().try_into().unwrap())) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(wakeup_time); + } + + // allow network interrupts + #[cfg(any(feature = "tcp", feature = "udp"))] + get_network_driver().unwrap().lock().set_polling_mode(false); + + return Err(IoError::ETIME); + } + } + } +} + +/// Blocks the current thread on `f`, running the executor when idling. +pub(crate) fn block_on(future: F, timeout: Option) -> Result +where + F: Future>, +{ + // disable network interrupts + #[cfg(any(feature = "tcp", feature = "udp"))] + let no_retransmission = { + let mut guard = get_network_driver().unwrap().lock(); + guard.set_polling_mode(true); + !guard.get_checksums().tcp.tx() + }; + + let backoff = Backoff::new(); + let start = now(); + let task_notify = Arc::new(TaskNotify::new()); + let waker = task_notify.clone().into(); + let mut cx = Context::from_waker(&waker); + let mut future = future; + let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; + + loop { + // run background tasks + run(); + + let now = now(); + if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { + #[cfg(any(feature = "tcp", feature = "udp"))] + if !no_retransmission { + let network_timer = + network_delay(Instant::from_micros_const(now.try_into().unwrap())) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(network_timer); + } + + // allow network interrupts + #[cfg(any(feature = "tcp", feature = "udp"))] + get_network_driver().unwrap().lock().set_polling_mode(false); + + return t; + } + + if let Some(duration) = timeout { + if Duration::from_micros(now - start) >= duration { + #[cfg(any(feature = "tcp", feature = "udp"))] + if !no_retransmission { + let network_timer = + network_delay(Instant::from_micros_const(now.try_into().unwrap())) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(network_timer); + } + + // allow network interrupts + #[cfg(any(feature = "tcp", feature = "udp"))] + get_network_driver().unwrap().lock().set_polling_mode(false); + + return Err(IoError::ETIME); + } + } + + #[cfg(any(feature = "tcp", feature = "udp"))] + let delay = network_delay(Instant::from_micros_const(now.try_into().unwrap())) + .map(|d| d.total_micros()); + #[cfg(not(any(feature = "tcp", feature = "udp")))] + let delay = None; + + if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 { + let wakeup_time = timeout.map(|duration| { + u64::try_from(start).unwrap() + u64::try_from(duration.as_micros()).unwrap() + }); + #[cfg(any(feature = "tcp", feature = "udp"))] + if !no_retransmission { + let ticks = crate::arch::processor::get_timer_ticks(); + let network_timer = delay.map(|d| ticks + d); + core_scheduler().add_network_timer(network_timer); + } + + // allow network interrupts + #[cfg(any(feature = "tcp", feature = "udp"))] + get_network_driver().unwrap().lock().set_polling_mode(false); + + // switch to another task + task_notify.wait(wakeup_time); + + // restore default values + #[cfg(any(feature = "tcp", feature = "udp"))] + get_network_driver().unwrap().lock().set_polling_mode(true); + backoff.reset(); + } else { + backoff.snooze(); + } + } +} diff --git a/src/executor/network.rs b/src/executor/network.rs index 96677786ea..907a6ac5fe 100644 --- a/src/executor/network.rs +++ b/src/executor/network.rs @@ -1,12 +1,9 @@ use alloc::boxed::Box; -use alloc::sync::Arc; use core::future; -use core::future::Future; use core::ops::DerefMut; use core::sync::atomic::{AtomicU16, Ordering}; -use core::task::{Context, Poll}; +use core::task::Poll; -use crossbeam_utils::Backoff; use hermit_sync::InterruptTicketMutex; use smoltcp::iface::{SocketHandle, SocketSet}; #[cfg(feature = "dhcpv4")] @@ -21,15 +18,8 @@ use smoltcp::time::{Duration, Instant}; use smoltcp::wire::{IpCidr, Ipv4Address, Ipv4Cidr}; use crate::arch; -use crate::arch::core_local::*; -#[cfg(not(feature = "pci"))] -use crate::drivers::mmio::get_network_driver; -use crate::drivers::net::NetworkDriver; -#[cfg(feature = "pci")] -use crate::drivers::pci::get_network_driver; use crate::executor::device::HermitNet; -use crate::executor::{spawn, TaskNotify}; -use crate::fd::IoError; +use crate::executor::spawn; use crate::scheduler::PerCoreSchedulerExt; pub(crate) enum NetworkState<'a> { @@ -230,7 +220,7 @@ impl<'a> NetworkInterface<'a> { } #[inline] -fn network_delay(timestamp: Instant) -> Option { +pub(crate) fn network_delay(timestamp: Instant) -> Option { crate::executor::network::NIC .lock() .as_nic_mut() @@ -246,134 +236,3 @@ fn network_poll(timestamp: Instant) { .unwrap() .poll_common(timestamp); } - -/// Blocks the current thread on `f`, running the executor when idling. -pub(crate) fn block_on(future: F, timeout: Option) -> Result -where - F: Future>, -{ - // disable network interrupts - let no_retransmission = { - let mut guard = get_network_driver().unwrap().lock(); - guard.set_polling_mode(true); - !guard.get_checksums().tcp.tx() - }; - - let backoff = Backoff::new(); - let start = now(); - let task_notify = Arc::new(TaskNotify::new()); - let waker = task_notify.clone().into(); - let mut cx = Context::from_waker(&waker); - let mut future = future; - let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; - - loop { - // run background tasks - crate::executor::run(); - - let now = crate::executor::network::now(); - - if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { - if !no_retransmission { - let network_timer = network_delay(now) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - core_scheduler().add_network_timer(network_timer); - } - - // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); - - return t; - } - - if let Some(duration) = timeout { - if crate::executor::network::now() >= start + duration { - if !no_retransmission { - let network_timer = network_delay(now) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - core_scheduler().add_network_timer(network_timer); - } - - // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); - - return Err(IoError::ETIME); - } - } - - let delay = network_delay(now).map(|d| d.total_micros()); - if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 { - let wakeup_time = - timeout.map(|duration| u64::try_from((start + duration).total_micros()).unwrap()); - if !no_retransmission { - let ticks = crate::arch::processor::get_timer_ticks(); - let network_timer = delay.map(|d| ticks + d); - core_scheduler().add_network_timer(network_timer); - } - - // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); - - // switch to another task - task_notify.wait(wakeup_time); - - // restore default values - get_network_driver().unwrap().lock().set_polling_mode(true); - backoff.reset(); - } else { - backoff.snooze(); - } - } -} - -/// Blocks the current thread on `f`, running the executor when idling. -pub(crate) fn poll_on(future: F, timeout: Option) -> Result -where - F: Future>, -{ - // disable network interrupts - let no_retransmission = { - let mut guard = get_network_driver().unwrap().lock(); - guard.set_polling_mode(true); - guard.get_checksums().tcp.tx() - }; - - let start = now(); - let waker = core::task::Waker::noop(); - let mut cx = Context::from_waker(&waker); - let mut future = future; - let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; - - loop { - // run background tasks - crate::executor::run(); - - if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { - if !no_retransmission { - let wakeup_time = network_delay(now()) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - core_scheduler().add_network_timer(wakeup_time); - } - - // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); - - return t; - } - - if let Some(duration) = timeout { - if crate::executor::network::now() >= start + duration { - if !no_retransmission { - let wakeup_time = network_delay(now()) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - core_scheduler().add_network_timer(wakeup_time); - } - - // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); - - return Err(IoError::ETIME); - } - } - } -} diff --git a/src/fd/mod.rs b/src/fd/mod.rs index 2fc4cffcc9..980fd2c169 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -1,14 +1,20 @@ +use alloc::boxed::Box; use alloc::sync::Arc; use alloc::vec::Vec; +use core::future::{self, Future}; use core::sync::atomic::{AtomicI32, Ordering}; +use core::task::Poll::{Pending, Ready}; +use core::time::Duration; use ahash::RandomState; +use async_trait::async_trait; use dyn_clone::DynClone; use hashbrown::HashMap; #[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))] use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; use crate::env; +use crate::executor::{block_on, poll_on}; use crate::fd::stdio::*; use crate::fs::{self, DirectoryEntry, FileAttr, SeekWhence}; @@ -80,6 +86,35 @@ bitflags! { } } +bitflags! { + #[derive(Debug, Copy, Clone, Default)] + pub struct PollEvent: i16 { + const EMPTY = 0; + const POLLIN = 0x1; + const POLLPRI = 0x2; + const POLLOUT = 0x4; + const POLLERR = 0x8; + const POLLHUP = 0x10; + const POLLNVAL = 0x20; + const POLLRDNORM = 0x040; + const POLLRDBAND = 0x080; + const POLLWRNORM = 0x0100; + const POLLWRBAND = 0x0200; + const POLLRDHUP = 0x2000; + } +} + +#[repr(C)] +#[derive(Debug, Default, Copy, Clone)] +pub struct PollFd { + /// file descriptor + pub fd: i32, + /// events to look for + pub events: PollEvent, + /// events returned + pub revents: PollEvent, +} + bitflags! { #[derive(Debug, Copy, Clone)] pub struct AccessPermission: u32 { @@ -112,7 +147,13 @@ impl Default for AccessPermission { } } +#[async_trait] pub(crate) trait ObjectInterface: Sync + Send + core::fmt::Debug + DynClone { + /// check if an IO event is possible + async fn poll(&self, _event: PollEvent) -> Result { + Ok(PollEvent::EMPTY) + } + /// `read` attempts to read `len` bytes from the object references /// by the descriptor fn read(&self, _buf: &mut [u8]) -> Result { @@ -275,6 +316,48 @@ pub(crate) fn write(fd: FileDescriptor, buf: &[u8]) -> Result { get_object(fd)?.write(buf) } +async fn poll_fds(fds: &mut [PollFd]) -> Result<(), IoError> { + future::poll_fn(|mut cx| { + let mut ready: bool = false; + + for i in &mut *fds { + let fd = i.fd; + if let Ok(obj) = get_object(fd) { + let mut pinned = core::pin::pin!(obj.poll(i.events)); + if let Ready(event) = pinned.as_mut().poll(&mut cx) { + if let Ok(e) = event { + ready = true; + i.revents = e; + } + } + } + } + + if ready { + Ready(()) + } else { + Pending + } + }) + .await; + + Ok(()) +} + +pub(crate) fn poll(fds: &mut [PollFd], timeout: i32) -> Result<(), IoError> { + if timeout >= 0 { + let timeout = if timeout > 0 { + Some(Duration::from_millis(timeout.try_into().unwrap())) + } else { + None + }; + + poll_on(poll_fds(fds), timeout) + } else { + block_on(poll_fds(fds), None) + } +} + pub(crate) fn get_object(fd: FileDescriptor) -> Result, IoError> { Ok((*(OBJECT_MAP.read().get(&fd).ok_or(IoError::EINVAL)?)).clone()) } diff --git a/src/fd/socket/tcp.rs b/src/fd/socket/tcp.rs index 896f968ce2..2e4c0e83fe 100644 --- a/src/fd/socket/tcp.rs +++ b/src/fd/socket/tcp.rs @@ -8,7 +8,8 @@ use smoltcp::socket::tcp; use smoltcp::time::Duration; use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; -use crate::executor::network::{block_on, now, poll_on, Handle, NetworkState, NIC}; +use crate::executor::network::{now, Handle, NetworkState, NIC}; +use crate::executor::{block_on, poll_on}; use crate::fd::{IoCtl, IoError, ObjectInterface, SocketOption}; use crate::syscalls::net::*; use crate::DEFAULT_KEEP_ALIVE_INTERVAL; @@ -245,7 +246,7 @@ impl ObjectInterface for Socket { fn connect(&self, endpoint: IpEndpoint) -> Result<(), IoError> { if self.nonblocking.load(Ordering::Acquire) { - block_on(self.async_connect(endpoint), Some(Duration::ZERO)).map_err(|x| { + block_on(self.async_connect(endpoint), Some(Duration::ZERO.into())).map_err(|x| { if x == IoError::ETIME { IoError::EAGAIN } else { @@ -275,7 +276,7 @@ impl ObjectInterface for Socket { } if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_read(buf), Some(Duration::ZERO)).map_err(|x| { + poll_on(self.async_read(buf), Some(Duration::ZERO.into())).map_err(|x| { if x == IoError::ETIME { IoError::EAGAIN } else { @@ -283,7 +284,7 @@ impl ObjectInterface for Socket { } }) } else { - match poll_on(self.async_read(buf), Some(Duration::from_secs(2))) { + match poll_on(self.async_read(buf), Some(Duration::from_secs(2).into())) { Err(IoError::ETIME) => block_on(self.async_read(buf), None), Err(x) => Err(x), Ok(x) => Ok(x), @@ -297,7 +298,7 @@ impl ObjectInterface for Socket { } if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_write(buf), Some(Duration::ZERO)).map_err(|x| { + poll_on(self.async_write(buf), Some(Duration::ZERO.into())).map_err(|x| { if x == IoError::ETIME { IoError::EAGAIN } else { diff --git a/src/fd/socket/udp.rs b/src/fd/socket/udp.rs index a3002855e6..c2bcfa496e 100644 --- a/src/fd/socket/udp.rs +++ b/src/fd/socket/udp.rs @@ -10,6 +10,7 @@ use smoltcp::time::Duration; use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; use crate::executor::network::{block_on, now, poll_on, Handle, NetworkState, NIC}; +use crate::executor::poll_on; use crate::fd::{IoCtl, IoError, ObjectInterface}; #[derive(Debug)] @@ -155,7 +156,7 @@ impl ObjectInterface for Socket { let meta = UdpMetadata::from(endpoint); if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_write(buf, &meta), Some(Duration::ZERO)) + poll_on(self.async_write(buf, &meta), Some(Duration::ZERO.into())) } else { poll_on(self.async_write(buf, &meta), None) } @@ -163,7 +164,7 @@ impl ObjectInterface for Socket { fn recvfrom(&self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), IoError> { if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_recvfrom(buf), Some(Duration::ZERO)).map_err(|x| { + poll_on(self.async_recvfrom(buf), Some(Duration::ZERO.into())).map_err(|x| { if x == IoError::ETIME { IoError::EAGAIN } else { @@ -171,7 +172,10 @@ impl ObjectInterface for Socket { } }) } else { - match poll_on(self.async_recvfrom(buf), Some(Duration::from_secs(2))) { + match poll_on( + self.async_recvfrom(buf), + Some(Duration::from_secs(2).into()), + ) { Err(IoError::ETIME) => block_on(self.async_recvfrom(buf), None), Err(x) => Err(x), Ok(x) => Ok(x), @@ -185,7 +189,7 @@ impl ObjectInterface for Socket { } if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_read(buf), Some(Duration::ZERO)).map_err(|x| { + poll_on(self.async_read(buf), Some(Duration::ZERO.into())).map_err(|x| { if x == IoError::ETIME { IoError::EAGAIN } else { @@ -193,7 +197,7 @@ impl ObjectInterface for Socket { } }) } else { - match poll_on(self.async_read(buf), Some(Duration::from_secs(2))) { + match poll_on(self.async_read(buf), Some(Duration::from_secs(2).into())) { Err(IoError::ETIME) => block_on(self.async_read(buf), None), Err(x) => Err(x), Ok(x) => Ok(x), @@ -214,7 +218,7 @@ impl ObjectInterface for Socket { let meta = UdpMetadata::from(endpoint.unwrap()); if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_write(buf, &meta), Some(Duration::ZERO)) + poll_on(self.async_write(buf, &meta), Some(Duration::ZERO.into())) } else { poll_on(self.async_write(buf, &meta), None) } diff --git a/src/fd/stdio.rs b/src/fd/stdio.rs index 79fec62f77..4c61ddfe01 100644 --- a/src/fd/stdio.rs +++ b/src/fd/stdio.rs @@ -1,13 +1,15 @@ +use alloc::boxed::Box; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use core::ptr; +use async_trait::async_trait; #[cfg(target_arch = "x86_64")] use x86::io::*; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use crate::arch::mm::{paging, VirtAddr}; use crate::console::CONSOLE; -use crate::fd::{IoError, ObjectInterface, STDERR_FILENO, STDOUT_FILENO}; +use crate::fd::{IoError, ObjectInterface, PollEvent, STDERR_FILENO, STDOUT_FILENO}; const UHYVE_PORT_WRITE: u16 = 0x400; @@ -76,7 +78,20 @@ impl GenericStdin { #[derive(Debug, Clone)] pub struct GenericStdout; +#[async_trait] impl ObjectInterface for GenericStdout { + async fn poll(&self, event: PollEvent) -> Result { + if event.contains(PollEvent::POLLOUT) { + Ok(PollEvent::POLLOUT) + } else if event.contains(PollEvent::POLLWRNORM) { + Ok(PollEvent::POLLWRNORM) + } else if event.contains(PollEvent::POLLWRBAND) { + Ok(PollEvent::POLLWRBAND) + } else { + Ok(PollEvent::EMPTY) + } + } + fn write(&self, buf: &[u8]) -> Result { // stdin/err/out all go to console CONSOLE.lock().write_all(buf); @@ -94,7 +109,20 @@ impl GenericStdout { #[derive(Debug, Clone)] pub struct GenericStderr; +#[async_trait] impl ObjectInterface for GenericStderr { + async fn poll(&self, event: PollEvent) -> Result { + if event.contains(PollEvent::POLLOUT) { + Ok(PollEvent::POLLOUT) + } else if event.contains(PollEvent::POLLWRNORM) { + Ok(PollEvent::POLLWRNORM) + } else if event.contains(PollEvent::POLLWRBAND) { + Ok(PollEvent::POLLWRBAND) + } else { + Ok(PollEvent::EMPTY) + } + } + fn write(&self, buf: &[u8]) -> Result { // stdin/err/out all go to console CONSOLE.lock().write_all(buf); @@ -123,7 +151,20 @@ impl UhyveStdin { #[derive(Debug, Clone)] pub struct UhyveStdout; +#[async_trait] impl ObjectInterface for UhyveStdout { + async fn poll(&self, event: PollEvent) -> Result { + if event.contains(PollEvent::POLLOUT) { + Ok(PollEvent::POLLOUT) + } else if event.contains(PollEvent::POLLWRNORM) { + Ok(PollEvent::POLLWRNORM) + } else if event.contains(PollEvent::POLLWRBAND) { + Ok(PollEvent::POLLWRBAND) + } else { + Ok(PollEvent::EMPTY) + } + } + fn write(&self, buf: &[u8]) -> Result { let mut syswrite = SysWrite::new(STDOUT_FILENO, buf.as_ptr(), buf.len()); uhyve_send(UHYVE_PORT_WRITE, &mut syswrite); @@ -141,7 +182,20 @@ impl UhyveStdout { #[derive(Debug, Clone)] pub struct UhyveStderr; +#[async_trait] impl ObjectInterface for UhyveStderr { + async fn poll(&self, event: PollEvent) -> Result { + if event.contains(PollEvent::POLLOUT) { + Ok(PollEvent::POLLOUT) + } else if event.contains(PollEvent::POLLWRNORM) { + Ok(PollEvent::POLLWRNORM) + } else if event.contains(PollEvent::POLLWRBAND) { + Ok(PollEvent::POLLWRBAND) + } else { + Ok(PollEvent::EMPTY) + } + } + fn write(&self, buf: &[u8]) -> Result { let mut syswrite = SysWrite::new(STDERR_FILENO, buf.as_ptr(), buf.len()); uhyve_send(UHYVE_PORT_WRITE, &mut syswrite); diff --git a/src/fs/mem.rs b/src/fs/mem.rs index 6b80376a75..5743ca626f 100644 --- a/src/fs/mem.rs +++ b/src/fs/mem.rs @@ -16,10 +16,11 @@ use alloc::sync::Arc; use alloc::vec::Vec; use core::slice; +use async_trait::async_trait; use hermit_sync::{RwSpinLock, SpinMutex}; use crate::arch; -use crate::fd::{AccessPermission, IoError, ObjectInterface, OpenOption}; +use crate::fd::{AccessPermission, IoError, ObjectInterface, OpenOption, PollEvent}; use crate::fs::{DirectoryEntry, FileAttr, NodeKind, VfsNode}; #[derive(Debug)] @@ -45,7 +46,24 @@ struct RomFileInterface { inner: Arc>, } +#[async_trait] impl ObjectInterface for RomFileInterface { + async fn poll(&self, event: PollEvent) -> Result { + let len = self.inner.read().data.len(); + let pos_guard = self.pos.lock(); + let pos = *pos_guard; + + if event.contains(PollEvent::POLLIN) && pos < len { + Ok(PollEvent::POLLIN) + } else if event.contains(PollEvent::POLLRDNORM) && pos < len { + Ok(PollEvent::POLLRDNORM) + } else if event.contains(PollEvent::POLLRDBAND) && pos < len { + Ok(PollEvent::POLLRDBAND) + } else { + Ok(PollEvent::EMPTY) + } + } + fn read(&self, buf: &mut [u8]) -> Result { { let microseconds = arch::kernel::systemtime::now_micros(); @@ -111,7 +129,30 @@ pub struct RamFileInterface { inner: Arc>, } +#[async_trait] impl ObjectInterface for RamFileInterface { + async fn poll(&self, event: PollEvent) -> Result { + let len = self.inner.read().data.len(); + let pos_guard = self.pos.lock(); + let pos = *pos_guard; + + if event.contains(PollEvent::POLLIN) && pos < len { + Ok(PollEvent::POLLIN) + } else if event.contains(PollEvent::POLLRDNORM) && pos < len { + Ok(PollEvent::POLLRDNORM) + } else if event.contains(PollEvent::POLLRDBAND) && pos < len { + Ok(PollEvent::POLLRDBAND) + } else if event.contains(PollEvent::POLLOUT) { + Ok(PollEvent::POLLOUT) + } else if event.contains(PollEvent::POLLWRNORM) { + Ok(PollEvent::POLLWRNORM) + } else if event.contains(PollEvent::POLLWRBAND) { + Ok(PollEvent::POLLWRBAND) + } else { + Ok(PollEvent::EMPTY) + } + } + fn read(&self, buf: &mut [u8]) -> Result { { let microseconds = arch::kernel::systemtime::now_micros(); diff --git a/src/syscalls/mod.rs b/src/syscalls/mod.rs index 7a805d36de..e9bf260c9d 100644 --- a/src/syscalls/mod.rs +++ b/src/syscalls/mod.rs @@ -21,6 +21,7 @@ pub use self::timer::*; use crate::env; use crate::fd::{ dup_object, get_object, remove_object, AccessPermission, FileDescriptor, IoCtl, OpenOption, + PollFd, }; use crate::fs::{self, FileAttr}; use crate::syscalls::interfaces::SyscallInterface; @@ -415,6 +416,17 @@ pub extern "C" fn sys_dup(fd: i32) -> i32 { kernel_function!(__sys_dup(fd)) } +extern "C" fn __sys_poll(fds: *mut PollFd, nfds: usize, timeout: i32) -> i32 { + let slice = unsafe { core::slice::from_raw_parts_mut(fds, nfds) }; + + crate::fd::poll(slice, timeout).map_or_else(|e| -num::ToPrimitive::to_i32(&e).unwrap(), |_| 0) +} + +#[no_mangle] +pub extern "C" fn sys_poll(fds: *mut PollFd, nfds: usize, timeout: i32) -> i32 { + kernel_function!(__sys_poll(fds, nfds, timeout)) +} + extern "C" fn __sys_image_start_addr() -> usize { crate::mm::kernel_start_address().0.try_into().unwrap() } From 2da95bf6544ea2223f2fc0def10749196d8d96aa Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Thu, 1 Feb 2024 08:57:28 +0100 Subject: [PATCH 02/19] remove clippy warnings --- src/executor/mod.rs | 7 ++----- src/fd/mod.rs | 10 ++++------ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 0d66c021d4..6a0f91bec5 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -103,8 +103,6 @@ pub fn init() { #[inline] pub(crate) fn now() -> u64 { crate::arch::kernel::systemtime::now_micros() - .try_into() - .unwrap() } /// Blocks the current thread on `f`, running the executor when idling. @@ -233,9 +231,8 @@ where let delay = None; if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 { - let wakeup_time = timeout.map(|duration| { - u64::try_from(start).unwrap() + u64::try_from(duration.as_micros()).unwrap() - }); + let wakeup_time = + timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap()); #[cfg(any(feature = "tcp", feature = "udp"))] if !no_retransmission { let ticks = crate::arch::processor::get_timer_ticks(); diff --git a/src/fd/mod.rs b/src/fd/mod.rs index 980fd2c169..6e0c04a50b 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -317,18 +317,16 @@ pub(crate) fn write(fd: FileDescriptor, buf: &[u8]) -> Result { } async fn poll_fds(fds: &mut [PollFd]) -> Result<(), IoError> { - future::poll_fn(|mut cx| { + future::poll_fn(|cx| { let mut ready: bool = false; for i in &mut *fds { let fd = i.fd; if let Ok(obj) = get_object(fd) { let mut pinned = core::pin::pin!(obj.poll(i.events)); - if let Ready(event) = pinned.as_mut().poll(&mut cx) { - if let Ok(e) = event { - ready = true; - i.revents = e; - } + if let Ready(Ok(e)) = pinned.as_mut().poll(cx) { + ready = true; + i.revents = e; } } } From 7d671c02223eb42416518cb20f1b8c2fa9e699f0 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Thu, 1 Feb 2024 09:16:51 +0100 Subject: [PATCH 03/19] import block_on directly from executor/mod.rs --- src/fd/socket/udp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fd/socket/udp.rs b/src/fd/socket/udp.rs index c2bcfa496e..eb8e55d2fd 100644 --- a/src/fd/socket/udp.rs +++ b/src/fd/socket/udp.rs @@ -9,8 +9,8 @@ use smoltcp::socket::udp::UdpMetadata; use smoltcp::time::Duration; use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; -use crate::executor::network::{block_on, now, poll_on, Handle, NetworkState, NIC}; -use crate::executor::poll_on; +use crate::executor::network::{now, Handle, NetworkState, NIC}; +use crate::executor::{block_on, poll_on}; use crate::fd::{IoCtl, IoError, ObjectInterface}; #[derive(Debug)] From 97633dba3e4e3949221141dfb775a9c8a4dc44a0 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Thu, 1 Feb 2024 09:41:32 +0100 Subject: [PATCH 04/19] remove clippy warnings --- src/executor/mod.rs | 61 +++++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 6a0f91bec5..f51ecdd4fa 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -37,6 +37,7 @@ use crate::drivers::pci::get_network_driver; use crate::executor::network::network_delay; use crate::executor::task::AsyncTask; use crate::fd::IoError; +#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))] use crate::scheduler::PerCoreSchedulerExt; use crate::synch::futex::*; @@ -225,34 +226,46 @@ where } #[cfg(any(feature = "tcp", feature = "udp"))] - let delay = network_delay(Instant::from_micros_const(now.try_into().unwrap())) - .map(|d| d.total_micros()); - #[cfg(not(any(feature = "tcp", feature = "udp")))] - let delay = None; + { + let delay = network_delay(Instant::from_micros_const(now.try_into().unwrap())) + .map(|d| d.total_micros()); - if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 { - let wakeup_time = - timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap()); - #[cfg(any(feature = "tcp", feature = "udp"))] - if !no_retransmission { - let ticks = crate::arch::processor::get_timer_ticks(); - let network_timer = delay.map(|d| ticks + d); - core_scheduler().add_network_timer(network_timer); - } + if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 { + let wakeup_time = + timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap()); + if !no_retransmission { + let ticks = crate::arch::processor::get_timer_ticks(); + let network_timer = delay.map(|d| ticks + d); + core_scheduler().add_network_timer(network_timer); + } - // allow network interrupts - #[cfg(any(feature = "tcp", feature = "udp"))] - get_network_driver().unwrap().lock().set_polling_mode(false); + // allow network interrupts + get_network_driver().unwrap().lock().set_polling_mode(false); - // switch to another task - task_notify.wait(wakeup_time); + // switch to another task + task_notify.wait(wakeup_time); - // restore default values - #[cfg(any(feature = "tcp", feature = "udp"))] - get_network_driver().unwrap().lock().set_polling_mode(true); - backoff.reset(); - } else { - backoff.snooze(); + // restore default values + get_network_driver().unwrap().lock().set_polling_mode(true); + backoff.reset(); + } else { + backoff.snooze(); + } + } + #[cfg(not(any(feature = "tcp", feature = "udp")))] + { + if backoff.is_completed() { + let wakeup_time = + timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap()); + + // switch to another task + task_notify.wait(wakeup_time); + + // restore default values + backoff.reset(); + } else { + backoff.snooze(); + } } } } From 3387131155f365d461fdc6a8e3e629656d6d8f80 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Thu, 1 Feb 2024 23:24:18 +0100 Subject: [PATCH 05/19] revise poll function and check all possible requests --- src/fd/stdio.rs | 48 ++++++++++++++++++++++++++++-------------------- src/fs/mem.rs | 28 +++++++++++++++------------- 2 files changed, 43 insertions(+), 33 deletions(-) diff --git a/src/fd/stdio.rs b/src/fd/stdio.rs index 4c61ddfe01..55fa67ae8f 100644 --- a/src/fd/stdio.rs +++ b/src/fd/stdio.rs @@ -81,15 +81,17 @@ pub struct GenericStdout; #[async_trait] impl ObjectInterface for GenericStdout { async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; + if event.contains(PollEvent::POLLOUT) { - Ok(PollEvent::POLLOUT) + result.insert(PollEvent::POLLOUT); } else if event.contains(PollEvent::POLLWRNORM) { - Ok(PollEvent::POLLWRNORM) + result.insert(PollEvent::POLLWRNORM); } else if event.contains(PollEvent::POLLWRBAND) { - Ok(PollEvent::POLLWRBAND) - } else { - Ok(PollEvent::EMPTY) + result.insert(PollEvent::POLLWRBAND); } + + Ok(result) } fn write(&self, buf: &[u8]) -> Result { @@ -112,15 +114,17 @@ pub struct GenericStderr; #[async_trait] impl ObjectInterface for GenericStderr { async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; + if event.contains(PollEvent::POLLOUT) { - Ok(PollEvent::POLLOUT) + result.insert(PollEvent::POLLOUT); } else if event.contains(PollEvent::POLLWRNORM) { - Ok(PollEvent::POLLWRNORM) + result.insert(PollEvent::POLLWRNORM); } else if event.contains(PollEvent::POLLWRBAND) { - Ok(PollEvent::POLLWRBAND) - } else { - Ok(PollEvent::EMPTY) + result.insert(PollEvent::POLLWRBAND); } + + Ok(result) } fn write(&self, buf: &[u8]) -> Result { @@ -154,15 +158,17 @@ pub struct UhyveStdout; #[async_trait] impl ObjectInterface for UhyveStdout { async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; + if event.contains(PollEvent::POLLOUT) { - Ok(PollEvent::POLLOUT) + result.insert(PollEvent::POLLOUT); } else if event.contains(PollEvent::POLLWRNORM) { - Ok(PollEvent::POLLWRNORM) + result.insert(PollEvent::POLLWRNORM); } else if event.contains(PollEvent::POLLWRBAND) { - Ok(PollEvent::POLLWRBAND) - } else { - Ok(PollEvent::EMPTY) + result.insert(PollEvent::POLLWRBAND); } + + Ok(result) } fn write(&self, buf: &[u8]) -> Result { @@ -185,15 +191,17 @@ pub struct UhyveStderr; #[async_trait] impl ObjectInterface for UhyveStderr { async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; + if event.contains(PollEvent::POLLOUT) { - Ok(PollEvent::POLLOUT) + result.insert(PollEvent::POLLOUT); } else if event.contains(PollEvent::POLLWRNORM) { - Ok(PollEvent::POLLWRNORM) + result.insert(PollEvent::POLLWRNORM); } else if event.contains(PollEvent::POLLWRBAND) { - Ok(PollEvent::POLLWRBAND) - } else { - Ok(PollEvent::EMPTY) + result.insert(PollEvent::POLLWRBAND); } + + Ok(result) } fn write(&self, buf: &[u8]) -> Result { diff --git a/src/fs/mem.rs b/src/fs/mem.rs index 5743ca626f..b115538bbb 100644 --- a/src/fs/mem.rs +++ b/src/fs/mem.rs @@ -49,19 +49,20 @@ struct RomFileInterface { #[async_trait] impl ObjectInterface for RomFileInterface { async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; let len = self.inner.read().data.len(); let pos_guard = self.pos.lock(); let pos = *pos_guard; if event.contains(PollEvent::POLLIN) && pos < len { - Ok(PollEvent::POLLIN) + result.insert(PollEvent::POLLIN); } else if event.contains(PollEvent::POLLRDNORM) && pos < len { - Ok(PollEvent::POLLRDNORM) + result.insert(PollEvent::POLLRDNORM); } else if event.contains(PollEvent::POLLRDBAND) && pos < len { - Ok(PollEvent::POLLRDBAND) - } else { - Ok(PollEvent::EMPTY) + result.insert(PollEvent::POLLRDBAND); } + + Ok(result) } fn read(&self, buf: &mut [u8]) -> Result { @@ -132,25 +133,26 @@ pub struct RamFileInterface { #[async_trait] impl ObjectInterface for RamFileInterface { async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; let len = self.inner.read().data.len(); let pos_guard = self.pos.lock(); let pos = *pos_guard; if event.contains(PollEvent::POLLIN) && pos < len { - Ok(PollEvent::POLLIN) + result.insert(PollEvent::POLLIN); } else if event.contains(PollEvent::POLLRDNORM) && pos < len { - Ok(PollEvent::POLLRDNORM) + result.insert(PollEvent::POLLRDNORM); } else if event.contains(PollEvent::POLLRDBAND) && pos < len { - Ok(PollEvent::POLLRDBAND) + result.insert(PollEvent::POLLRDBAND); } else if event.contains(PollEvent::POLLOUT) { - Ok(PollEvent::POLLOUT) + result.insert(PollEvent::POLLOUT); } else if event.contains(PollEvent::POLLWRNORM) { - Ok(PollEvent::POLLWRNORM) + result.insert(PollEvent::POLLWRNORM); } else if event.contains(PollEvent::POLLWRBAND) { - Ok(PollEvent::POLLWRBAND) - } else { - Ok(PollEvent::EMPTY) + result.insert(PollEvent::POLLWRBAND); } + + Ok(result) } fn read(&self, buf: &mut [u8]) -> Result { From cf47af04e625032f0559675de64b9f979cba82b5 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Fri, 2 Feb 2024 00:05:49 +0100 Subject: [PATCH 06/19] add poll function to the socket interface --- src/fd/socket/tcp.rs | 50 +++++++++++++++++++++++++++++++++++++++++++- src/fd/socket/udp.rs | 43 ++++++++++++++++++++++++++++++++++++- 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/src/fd/socket/tcp.rs b/src/fd/socket/tcp.rs index 2e4c0e83fe..eb2c0b0987 100644 --- a/src/fd/socket/tcp.rs +++ b/src/fd/socket/tcp.rs @@ -1,8 +1,10 @@ +use alloc::boxed::Box; use core::future; use core::ops::DerefMut; use core::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use core::task::Poll; +use async_trait::async_trait; use smoltcp::iface; use smoltcp::socket::tcp; use smoltcp::time::Duration; @@ -10,7 +12,7 @@ use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; use crate::executor::network::{now, Handle, NetworkState, NIC}; use crate::executor::{block_on, poll_on}; -use crate::fd::{IoCtl, IoError, ObjectInterface, SocketOption}; +use crate::fd::{IoCtl, IoError, ObjectInterface, PollEvent, SocketOption}; use crate::syscalls::net::*; use crate::DEFAULT_KEEP_ALIVE_INTERVAL; @@ -238,7 +240,53 @@ impl Socket { } } +#[async_trait] impl ObjectInterface for Socket { + async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; + + future::poll_fn(|cx| { + self.with(|socket| match socket.state() { + tcp::State::Closed + | tcp::State::Closing + | tcp::State::CloseWait + | tcp::State::FinWait1 + | tcp::State::FinWait2 + | tcp::State::Listen + | tcp::State::TimeWait => Poll::Ready(Err(IoError::EIO)), + _ => { + if socket.can_send() { + if event.contains(PollEvent::POLLOUT) { + result.insert(PollEvent::POLLOUT); + } else if event.contains(PollEvent::POLLWRNORM) { + result.insert(PollEvent::POLLWRNORM); + } else if event.contains(PollEvent::POLLWRBAND) { + result.insert(PollEvent::POLLWRBAND); + } + } + + if socket.can_recv() { + if event.contains(PollEvent::POLLIN) { + result.insert(PollEvent::POLLIN); + } else if event.contains(PollEvent::POLLRDNORM) { + result.insert(PollEvent::POLLRDNORM); + } else if event.contains(PollEvent::POLLRDBAND) { + result.insert(PollEvent::POLLRDBAND); + } + } + + if result.is_empty() { + socket.register_recv_waker(cx.waker()); + Poll::Pending + } else { + Poll::Ready(Ok(result)) + } + } + }) + }) + .await + } + fn bind(&self, endpoint: IpListenEndpoint) -> Result<(), IoError> { self.port.store(endpoint.port, Ordering::Release); Ok(()) diff --git a/src/fd/socket/udp.rs b/src/fd/socket/udp.rs index eb8e55d2fd..e68091da6a 100644 --- a/src/fd/socket/udp.rs +++ b/src/fd/socket/udp.rs @@ -1,8 +1,10 @@ +use alloc::boxed::Box; use core::future; use core::ops::DerefMut; use core::sync::atomic::{AtomicBool, Ordering}; use core::task::Poll; +use async_trait::async_trait; use crossbeam_utils::atomic::AtomicCell; use smoltcp::socket::udp; use smoltcp::socket::udp::UdpMetadata; @@ -11,7 +13,7 @@ use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; use crate::executor::network::{now, Handle, NetworkState, NIC}; use crate::executor::{block_on, poll_on}; -use crate::fd::{IoCtl, IoError, ObjectInterface}; +use crate::fd::{IoCtl, IoError, ObjectInterface, PollEvent}; #[derive(Debug)] pub struct IPv4; @@ -142,7 +144,46 @@ impl Socket { } } +#[async_trait] impl ObjectInterface for Socket { + async fn poll(&self, event: PollEvent) -> Result { + let mut result: PollEvent = PollEvent::EMPTY; + + future::poll_fn(|cx| { + self.with(|socket| { + if socket.is_open() { + if socket.can_send() { + if event.contains(PollEvent::POLLOUT) { + result.insert(PollEvent::POLLOUT); + } else if event.contains(PollEvent::POLLWRNORM) { + result.insert(PollEvent::POLLWRNORM); + } else if event.contains(PollEvent::POLLWRBAND) { + result.insert(PollEvent::POLLWRBAND); + } + } + + if socket.can_recv() { + if event.contains(PollEvent::POLLIN) { + result.insert(PollEvent::POLLIN); + } else if event.contains(PollEvent::POLLRDNORM) { + result.insert(PollEvent::POLLRDNORM); + } else if event.contains(PollEvent::POLLRDBAND) { + result.insert(PollEvent::POLLRDBAND); + } + } + } + + if result.is_empty() { + socket.register_recv_waker(cx.waker()); + Poll::Pending + } else { + Poll::Ready(Ok(result)) + } + }) + }) + .await + } + fn bind(&self, endpoint: IpListenEndpoint) -> Result<(), IoError> { self.with(|socket| socket.bind(endpoint).map_err(|_| IoError::EADDRINUSE)) } From 2b9a9555a111df0bc980b53a613caea3a4ae4a20 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Fri, 2 Feb 2024 11:05:08 +0100 Subject: [PATCH 07/19] for larger timeouts using a blocking method to wait for the event --- src/fd/mod.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/fd/mod.rs b/src/fd/mod.rs index 6e0c04a50b..87847cf2c9 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -344,13 +344,18 @@ async fn poll_fds(fds: &mut [PollFd]) -> Result<(), IoError> { pub(crate) fn poll(fds: &mut [PollFd], timeout: i32) -> Result<(), IoError> { if timeout >= 0 { - let timeout = if timeout > 0 { - Some(Duration::from_millis(timeout.try_into().unwrap())) + // for larger timeouts, we block on the async function + if timeout >= 5000 { + block_on( + poll_fds(fds), + Some(Duration::from_millis(timeout.try_into().unwrap())), + ) } else { - None - }; - - poll_on(poll_fds(fds), timeout) + poll_on( + poll_fds(fds), + Some(Duration::from_millis(timeout.try_into().unwrap())), + ) + } } else { block_on(poll_fds(fds), None) } From cafd528de6dce6cdb7b92787c82981feea8f137e Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Fri, 2 Feb 2024 11:24:21 +0100 Subject: [PATCH 08/19] if socket is not open, return PollEvent::POLLNVAL according to the POSIX standard, POLLNVAL should be returned, if the socket is already closed. --- src/fd/socket/tcp.rs | 5 ++++- src/fd/socket/udp.rs | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/fd/socket/tcp.rs b/src/fd/socket/tcp.rs index eb2c0b0987..e206db5fad 100644 --- a/src/fd/socket/tcp.rs +++ b/src/fd/socket/tcp.rs @@ -253,7 +253,10 @@ impl ObjectInterface for Socket { | tcp::State::FinWait1 | tcp::State::FinWait2 | tcp::State::Listen - | tcp::State::TimeWait => Poll::Ready(Err(IoError::EIO)), + | tcp::State::TimeWait => { + result.insert(PollEvent::POLLNVAL); + Poll::Ready(Ok(result)) + } _ => { if socket.can_send() { if event.contains(PollEvent::POLLOUT) { diff --git a/src/fd/socket/udp.rs b/src/fd/socket/udp.rs index e68091da6a..67055f6221 100644 --- a/src/fd/socket/udp.rs +++ b/src/fd/socket/udp.rs @@ -171,6 +171,8 @@ impl ObjectInterface for Socket { result.insert(PollEvent::POLLRDBAND); } } + } else { + result.insert(PollEvent::POLLNVAL); } if result.is_empty() { From 499f5aa816cde2bcf08f8b3a35db270882194a1e Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Fri, 2 Feb 2024 21:41:16 +0100 Subject: [PATCH 09/19] add support of poll for the fuse file system --- src/fs/fuse.rs | 122 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 2 deletions(-) diff --git a/src/fs/fuse.rs b/src/fs/fuse.rs index 802ffb2e6e..25e3fb7dc5 100644 --- a/src/fs/fuse.rs +++ b/src/fs/fuse.rs @@ -5,8 +5,11 @@ use alloc::string::String; use alloc::sync::Arc; use alloc::vec::Vec; use core::mem::MaybeUninit; -use core::{fmt, u32, u8}; +use core::sync::atomic::{AtomicU64, Ordering}; +use core::task::Poll; +use core::{fmt, future, u32, u8}; +use async_trait::async_trait; use hermit_sync::SpinMutex; use crate::alloc::string::ToString; @@ -15,7 +18,7 @@ use crate::arch::kernel::mmio::get_filesystem_driver; #[cfg(feature = "pci")] use crate::drivers::pci::get_filesystem_driver; use crate::drivers::virtio::virtqueue::AsSliceU8; -use crate::fd::IoError; +use crate::fd::{IoError, PollEvent}; use crate::fs::{ self, AccessPermission, DirectoryEntry, FileAttr, NodeKind, ObjectInterface, OpenOption, SeekWhence, VfsNode, @@ -339,6 +342,24 @@ struct fuse_lseek_out { } unsafe impl FuseOut for fuse_lseek_out {} +#[repr(C)] +#[derive(Default, Debug)] +struct fuse_poll_in { + pub fh: u64, + pub kh: u64, + pub flags: u32, + pub events: u32, +} +unsafe impl FuseIn for fuse_poll_in {} + +#[repr(C)] +#[derive(Default, Debug)] +struct fuse_poll_out { + revents: u32, + padding: u32, +} +unsafe impl FuseOut for fuse_poll_out {} + #[repr(u32)] #[derive(Debug, Copy, Clone)] #[allow(non_camel_case_types)] @@ -894,6 +915,62 @@ fn create_release(nid: u64, fh: u64) -> (Box>, Box (Box>, Box>) { + let len = core::mem::size_of::() + core::mem::size_of::(); + let layout = Layout::from_size_align( + len, + core::cmp::max( + core::mem::align_of::(), + core::mem::align_of::(), + ), + ) + .unwrap() + .pad_to_align(); + let cmd = unsafe { + let data = alloc(layout); + let raw = core::ptr::slice_from_raw_parts_mut(data, 0) as *mut Cmd; + (*raw).header = create_in_header::(nid, Opcode::FUSE_POLL); + (*raw).cmd = fuse_poll_in { + fh, + kh, + events: event.bits() as u32, + ..Default::default() + }; + + Box::from_raw(raw) + }; + assert_eq!(layout, Layout::for_value(&*cmd)); + + let len = core::mem::size_of::() + core::mem::size_of::(); + let layout = Layout::from_size_align( + len, + core::cmp::max( + core::mem::align_of::(), + core::mem::align_of::(), + ), + ) + .unwrap() + .pad_to_align(); + let rsp = unsafe { + let data = alloc(layout); + let raw = core::ptr::slice_from_raw_parts_mut(data, 0) as *mut Rsp; + (*raw).header = fuse_out_header { + len: len.try_into().unwrap(), + ..Default::default() + }; + + Box::from_raw(raw) + }; + assert_eq!(layout, Layout::for_value(&*rsp)); + + (cmd, rsp) +} + fn create_mkdir(path: &str, mode: u32) -> (Box>, Box>) { let slice = path.as_bytes(); let len = core::mem::size_of::() @@ -1164,6 +1241,42 @@ impl FuseFileHandleInner { } } + async fn poll(&self, events: PollEvent) -> Result { + static KH: AtomicU64 = AtomicU64::new(0); + let kh = KH.fetch_add(1, Ordering::SeqCst); + + future::poll_fn(|cx| { + if let (Some(nid), Some(fh)) = (self.fuse_nid, self.fuse_fh) { + let (cmd, mut rsp) = create_poll(nid, fh, kh, events); + get_filesystem_driver() + .ok_or(IoError::ENOSYS)? + .lock() + .send_command(cmd.as_ref(), rsp.as_mut()); + + if rsp.header.error < 0 { + Poll::Ready(Err(IoError::EIO)) + } else { + let revents = unsafe { + PollEvent::from_bits(i16::try_from(rsp.rsp.assume_init().revents).unwrap()) + .unwrap() + }; + if !revents.intersects(events) + && !revents.intersects( + PollEvent::POLLERR | PollEvent::POLLNVAL | PollEvent::POLLHUP, + ) { + // the current implementation use polling to wait for an event + // consequently, we have to wakeup the waker, if the the event doesn't arrive + cx.waker().wake_by_ref(); + } + Poll::Ready(Ok(revents)) + } + } else { + Poll::Ready(Ok(PollEvent::POLLERR)) + } + }) + .await + } + fn read(&mut self, buf: &mut [u8]) -> Result { let mut len = buf.len(); if len > MAX_READ_LEN { @@ -1280,7 +1393,12 @@ impl FuseFileHandle { } } +#[async_trait] impl ObjectInterface for FuseFileHandle { + async fn poll(&self, event: PollEvent) -> Result { + self.0.lock().poll(event).await + } + fn read(&self, buf: &mut [u8]) -> Result { self.0.lock().read(buf) } From 07c8fa3c398bcba1fd8983cd66981b7ff3f09c02 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Fri, 2 Feb 2024 23:48:28 +0100 Subject: [PATCH 10/19] using async locks for the IO interface --- Cargo.lock | 47 +++++ Cargo.toml | 1 + src/fd/mod.rs | 18 +- src/fd/socket/tcp.rs | 154 ++++++++-------- src/fs/fuse.rs | 21 +-- src/fs/mem.rs | 411 ++++++++++++++++++++++++------------------- 6 files changed, 385 insertions(+), 267 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 656e37f7a3..43c78b0844 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,17 @@ dependencies = [ "bitflags 2.4.2", ] +[[package]] +name = "async-lock" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.76" @@ -268,6 +279,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -377,6 +397,26 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "361a90feb7004eca4019fb28352a9465666b24f840f5c3cddf0ff13920590b89" +[[package]] +name = "event-listener" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "exclusive_cell" version = "0.1.0" @@ -504,6 +544,7 @@ dependencies = [ "align-address", "anyhow", "arm-gic", + "async-lock", "async-trait", "bit_field", "bitflags 2.4.2", @@ -874,6 +915,12 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + [[package]] name = "plain" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index 1379e17d70..40bfb56724 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ time = { version = "0.3", default-features = false } zerocopy = { version = "0.7", features = ["derive"] } build-time = "0.1.3" async-trait = "0.1.48" +async-lock = { version = "3.3.0", default-features = false } [dependencies.smoltcp] version = "0.11" diff --git a/src/fd/mod.rs b/src/fd/mod.rs index 87847cf2c9..20b5ca5bb9 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -154,16 +154,28 @@ pub(crate) trait ObjectInterface: Sync + Send + core::fmt::Debug + DynClone { Ok(PollEvent::EMPTY) } + /// `async_read` attempts to read `len` bytes from the object references + /// by the descriptor + async fn async_read(&self, _buf: &mut [u8]) -> Result { + Err(IoError::ENOSYS) + } + /// `read` attempts to read `len` bytes from the object references /// by the descriptor - fn read(&self, _buf: &mut [u8]) -> Result { + fn read(&self, buf: &mut [u8]) -> Result { + block_on(self.async_read(buf), None) + } + + /// `async_write` attempts to write `len` bytes to the object references + /// by the descriptor + async fn async_write(&self, _buf: &[u8]) -> Result { Err(IoError::ENOSYS) } /// `write` attempts to write `len` bytes to the object references /// by the descriptor - fn write(&self, _buf: &[u8]) -> Result { - Err(IoError::ENOSYS) + fn write(&self, buf: &[u8]) -> Result { + block_on(self.async_write(buf), None) } /// `lseek` function repositions the offset of the file descriptor fildes diff --git a/src/fd/socket/tcp.rs b/src/fd/socket/tcp.rs index e206db5fad..3b089b9f73 100644 --- a/src/fd/socket/tcp.rs +++ b/src/fd/socket/tcp.rs @@ -63,83 +63,6 @@ impl Socket { result } - // TODO: Remove allow once fixed: - // https://github.com/rust-lang/rust-clippy/issues/11380 - #[allow(clippy::needless_pass_by_ref_mut)] - async fn async_read(&self, buffer: &mut [u8]) -> Result { - future::poll_fn(|cx| { - self.with(|socket| match socket.state() { - tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait => { - Poll::Ready(Ok(0)) - } - tcp::State::FinWait1 - | tcp::State::FinWait2 - | tcp::State::Listen - | tcp::State::TimeWait => Poll::Ready(Err(IoError::EIO)), - _ => { - if socket.can_recv() { - Poll::Ready( - socket - .recv(|data| { - let len = core::cmp::min(buffer.len(), data.len()); - buffer[..len].copy_from_slice(&data[..len]); - (len, len) - }) - .map_err(|_| IoError::EIO), - ) - } else { - socket.register_recv_waker(cx.waker()); - Poll::Pending - } - } - }) - }) - .await - } - - async fn async_write(&self, buffer: &[u8]) -> Result { - let mut pos: usize = 0; - - while pos < buffer.len() { - let n = future::poll_fn(|cx| { - self.with(|socket| { - match socket.state() { - tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait => { - Poll::Ready(Ok(0)) - } - tcp::State::FinWait1 - | tcp::State::FinWait2 - | tcp::State::Listen - | tcp::State::TimeWait => Poll::Ready(Err(IoError::EIO)), - _ => { - if socket.can_send() { - Poll::Ready( - socket.send_slice(&buffer[pos..]).map_err(|_| IoError::EIO), - ) - } else if pos > 0 { - // we already send some data => return 0 as signal to stop the - // async write - Poll::Ready(Ok(0)) - } else { - socket.register_send_waker(cx.waker()); - Poll::Pending - } - } - } - }) - }) - .await?; - - if n == 0 { - break; - } - - pos += n; - } - - Ok(pos) - } - async fn async_connect(&self, endpoint: IpEndpoint) -> Result<(), IoError> { self.with_context(|socket, cx| socket.connect(cx, endpoint, get_ephemeral_port())) .map_err(|_| IoError::EIO)?; @@ -290,6 +213,83 @@ impl ObjectInterface for Socket { .await } + // TODO: Remove allow once fixed: + // https://github.com/rust-lang/rust-clippy/issues/11380 + #[allow(clippy::needless_pass_by_ref_mut)] + async fn async_read(&self, buffer: &mut [u8]) -> Result { + future::poll_fn(|cx| { + self.with(|socket| match socket.state() { + tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait => { + Poll::Ready(Ok(0)) + } + tcp::State::FinWait1 + | tcp::State::FinWait2 + | tcp::State::Listen + | tcp::State::TimeWait => Poll::Ready(Err(IoError::EIO)), + _ => { + if socket.can_recv() { + Poll::Ready( + socket + .recv(|data| { + let len = core::cmp::min(buffer.len(), data.len()); + buffer[..len].copy_from_slice(&data[..len]); + (len, len) + }) + .map_err(|_| IoError::EIO), + ) + } else { + socket.register_recv_waker(cx.waker()); + Poll::Pending + } + } + }) + }) + .await + } + + async fn async_write(&self, buffer: &[u8]) -> Result { + let mut pos: usize = 0; + + while pos < buffer.len() { + let n = future::poll_fn(|cx| { + self.with(|socket| { + match socket.state() { + tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait => { + Poll::Ready(Ok(0)) + } + tcp::State::FinWait1 + | tcp::State::FinWait2 + | tcp::State::Listen + | tcp::State::TimeWait => Poll::Ready(Err(IoError::EIO)), + _ => { + if socket.can_send() { + Poll::Ready( + socket.send_slice(&buffer[pos..]).map_err(|_| IoError::EIO), + ) + } else if pos > 0 { + // we already send some data => return 0 as signal to stop the + // async write + Poll::Ready(Ok(0)) + } else { + socket.register_send_waker(cx.waker()); + Poll::Pending + } + } + } + }) + }) + .await?; + + if n == 0 { + break; + } + + pos += n; + } + + Ok(pos) + } + fn bind(&self, endpoint: IpListenEndpoint) -> Result<(), IoError> { self.port.store(endpoint.port, Ordering::Release); Ok(()) diff --git a/src/fs/fuse.rs b/src/fs/fuse.rs index 25e3fb7dc5..7ef39732c4 100644 --- a/src/fs/fuse.rs +++ b/src/fs/fuse.rs @@ -9,8 +9,8 @@ use core::sync::atomic::{AtomicU64, Ordering}; use core::task::Poll; use core::{fmt, future, u32, u8}; +use async_lock::Mutex; use async_trait::async_trait; -use hermit_sync::SpinMutex; use crate::alloc::string::ToString; #[cfg(not(feature = "pci"))] @@ -18,6 +18,7 @@ use crate::arch::kernel::mmio::get_filesystem_driver; #[cfg(feature = "pci")] use crate::drivers::pci::get_filesystem_driver; use crate::drivers::virtio::virtqueue::AsSliceU8; +use crate::executor::block_on; use crate::fd::{IoError, PollEvent}; use crate::fs::{ self, AccessPermission, DirectoryEntry, FileAttr, NodeKind, ObjectInterface, OpenOption, @@ -1385,30 +1386,30 @@ impl Drop for FuseFileHandleInner { } #[derive(Debug)] -struct FuseFileHandle(pub Arc>); +struct FuseFileHandle(pub Arc>); impl FuseFileHandle { pub fn new() -> Self { - Self(Arc::new(SpinMutex::new(FuseFileHandleInner::new()))) + Self(Arc::new(Mutex::new(FuseFileHandleInner::new()))) } } #[async_trait] impl ObjectInterface for FuseFileHandle { async fn poll(&self, event: PollEvent) -> Result { - self.0.lock().poll(event).await + self.0.lock().await.poll(event).await } - fn read(&self, buf: &mut [u8]) -> Result { - self.0.lock().read(buf) + async fn async_read(&self, buf: &mut [u8]) -> Result { + self.0.lock().await.read(buf) } - fn write(&self, buf: &[u8]) -> Result { - self.0.lock().write(buf) + async fn async_write(&self, buf: &[u8]) -> Result { + self.0.lock().await.write(buf) } fn lseek(&self, offset: isize, whence: SeekWhence) -> Result { - self.0.lock().lseek(offset, whence) + block_on(async { self.0.lock().await.lseek(offset, whence) }, None) } } @@ -1601,7 +1602,7 @@ impl VfsNode for FuseDirectory { // 1.FUSE_INIT to create session // Already done - let mut file_guard = file.0.lock(); + let mut file_guard = block_on(async { Ok(file.0.lock().await) }, None)?; // Differentiate between opening and creating new file, since fuse does not support O_CREAT on open. if !opt.contains(OpenOption::O_CREAT) { diff --git a/src/fs/mem.rs b/src/fs/mem.rs index b115538bbb..eda319101c 100644 --- a/src/fs/mem.rs +++ b/src/fs/mem.rs @@ -16,10 +16,11 @@ use alloc::sync::Arc; use alloc::vec::Vec; use core::slice; +use async_lock::{Mutex, RwLock}; use async_trait::async_trait; -use hermit_sync::{RwSpinLock, SpinMutex}; use crate::arch; +use crate::executor::block_on; use crate::fd::{AccessPermission, IoError, ObjectInterface, OpenOption, PollEvent}; use crate::fs::{DirectoryEntry, FileAttr, NodeKind, VfsNode}; @@ -41,17 +42,17 @@ impl RomFileInner { #[derive(Debug, Clone)] struct RomFileInterface { /// Position within the file - pos: Arc>, + pos: Arc>, /// File content - inner: Arc>, + inner: Arc>, } #[async_trait] impl ObjectInterface for RomFileInterface { async fn poll(&self, event: PollEvent) -> Result { let mut result: PollEvent = PollEvent::EMPTY; - let len = self.inner.read().data.len(); - let pos_guard = self.pos.lock(); + let len = self.inner.read().await.data.len(); + let pos_guard = self.pos.lock().await; let pos = *pos_guard; if event.contains(PollEvent::POLLIN) && pos < len { @@ -65,16 +66,16 @@ impl ObjectInterface for RomFileInterface { Ok(result) } - fn read(&self, buf: &mut [u8]) -> Result { + async fn async_read(&self, buf: &mut [u8]) -> Result { { let microseconds = arch::kernel::systemtime::now_micros(); - let mut guard = self.inner.write(); + let mut guard = self.inner.write().await; guard.attr.st_atime = microseconds / 1_000_000; guard.attr.st_atime_nsec = (microseconds % 1_000_000) * 1000; } - let vec = self.inner.read().data; - let mut pos_guard = self.pos.lock(); + let vec = self.inner.read().await.data; + let mut pos_guard = self.pos.lock().await; let pos = *pos_guard; if pos >= vec.len() { @@ -95,15 +96,15 @@ impl ObjectInterface for RomFileInterface { } impl RomFileInterface { - pub fn new(inner: Arc>) -> Self { + pub fn new(inner: Arc>) -> Self { Self { - pos: Arc::new(SpinMutex::new(0)), + pos: Arc::new(Mutex::new(0)), inner, } } pub fn len(&self) -> usize { - self.inner.read().data.len() + block_on(async { Ok(self.inner.read().await.data.len()) }, None).unwrap() } } @@ -125,17 +126,17 @@ impl RamFileInner { #[derive(Debug, Clone)] pub struct RamFileInterface { /// Position within the file - pos: Arc>, + pos: Arc>, /// File content - inner: Arc>, + inner: Arc>, } #[async_trait] impl ObjectInterface for RamFileInterface { async fn poll(&self, event: PollEvent) -> Result { let mut result: PollEvent = PollEvent::EMPTY; - let len = self.inner.read().data.len(); - let pos_guard = self.pos.lock(); + let len = self.inner.read().await.data.len(); + let pos_guard = self.pos.lock().await; let pos = *pos_guard; if event.contains(PollEvent::POLLIN) && pos < len { @@ -155,16 +156,16 @@ impl ObjectInterface for RamFileInterface { Ok(result) } - fn read(&self, buf: &mut [u8]) -> Result { + async fn async_read(&self, buf: &mut [u8]) -> Result { { let microseconds = arch::kernel::systemtime::now_micros(); - let mut guard = self.inner.write(); + let mut guard = self.inner.write().await; guard.attr.st_atime = microseconds / 1_000_000; guard.attr.st_atime_nsec = (microseconds % 1_000_000) * 1000; } - let guard = self.inner.read(); - let mut pos_guard = self.pos.lock(); + let guard = self.inner.read().await; + let mut pos_guard = self.pos.lock().await; let pos = *pos_guard; if pos >= guard.data.len() { @@ -183,10 +184,10 @@ impl ObjectInterface for RamFileInterface { Ok(len) } - fn write(&self, buf: &[u8]) -> Result { + async fn async_write(&self, buf: &[u8]) -> Result { let microseconds = arch::kernel::systemtime::now_micros(); - let mut guard = self.inner.write(); - let mut pos_guard = self.pos.lock(); + let mut guard = self.inner.write().await; + let mut pos_guard = self.pos.lock().await; let pos = *pos_guard; if pos + buf.len() > guard.data.len() { @@ -208,21 +209,21 @@ impl ObjectInterface for RamFileInterface { } impl RamFileInterface { - pub fn new(inner: Arc>) -> Self { + pub fn new(inner: Arc>) -> Self { Self { - pos: Arc::new(SpinMutex::new(0)), + pos: Arc::new(Mutex::new(0)), inner, } } pub fn len(&self) -> usize { - self.inner.read().data.len() + block_on(async { Ok(self.inner.read().await.data.len()) }, None).unwrap() } } #[derive(Debug)] pub(crate) struct RomFile { - data: Arc>, + data: Arc>, } impl VfsNode for RomFile { @@ -235,7 +236,7 @@ impl VfsNode for RomFile { } fn get_file_attributes(&self) -> Result { - Ok(self.data.read().attr) + block_on(async { Ok(self.data.read().await.attr) }, None) } fn traverse_lstat(&self, components: &mut Vec<&str>) -> Result { @@ -271,14 +272,14 @@ impl RomFile { }; Self { - data: unsafe { Arc::new(RwSpinLock::new(RomFileInner::new(ptr, length, attr))) }, + data: unsafe { Arc::new(RwLock::new(RomFileInner::new(ptr, length, attr))) }, } } } #[derive(Debug, Clone)] pub(crate) struct RamFile { - data: Arc>, + data: Arc>, } impl VfsNode for RamFile { @@ -291,7 +292,7 @@ impl VfsNode for RamFile { } fn get_file_attributes(&self) -> Result { - Ok(self.data.read().attr) + block_on(async { Ok(self.data.read().await.attr) }, None) } fn traverse_lstat(&self, components: &mut Vec<&str>) -> Result { @@ -326,16 +327,15 @@ impl RamFile { }; Self { - data: Arc::new(RwSpinLock::new(RamFileInner::new(attr))), + data: Arc::new(RwLock::new(RamFileInner::new(attr))), } } } #[derive(Debug)] pub(crate) struct MemDirectory { - inner: Arc< - RwSpinLock>>, - >, + inner: + Arc>>>, attr: FileAttr, } @@ -344,7 +344,7 @@ impl MemDirectory { let microseconds = arch::kernel::systemtime::now_micros(); Self { - inner: Arc::new(RwSpinLock::new(BTreeMap::new())), + inner: Arc::new(RwLock::new(BTreeMap::new())), attr: FileAttr { st_mode: mode | AccessPermission::S_IFDIR, st_atime: microseconds / 1_000_000, @@ -357,6 +357,44 @@ impl MemDirectory { }, } } + + async fn async_traverse_open( + &self, + components: &mut Vec<&str>, + opt: OpenOption, + mode: AccessPermission, + ) -> Result, IoError> { + if let Some(component) = components.pop() { + let node_name = String::from(component); + + if components.is_empty() { + let mut guard = self.inner.write().await; + if opt.contains(OpenOption::O_CREAT) || opt.contains(OpenOption::O_CREAT) { + if guard.get(&node_name).is_some() { + return Err(IoError::EEXIST); + } else { + let file = Box::new(RamFile::new(mode)); + guard.insert(node_name, file.clone()); + return Ok(Arc::new(RamFileInterface::new(file.data.clone()))); + } + } else if let Some(file) = guard.get(&node_name) { + if file.get_kind() == NodeKind::File { + return file.get_object(); + } else { + return Err(IoError::ENOENT); + } + } else { + return Err(IoError::ENOENT); + } + } + + if let Some(directory) = self.inner.read().await.get(&node_name) { + return directory.traverse_open(components, opt, mode); + } + } + + Err(IoError::ENOENT) + } } impl VfsNode for MemDirectory { @@ -373,125 +411,160 @@ impl VfsNode for MemDirectory { components: &mut Vec<&str>, mode: AccessPermission, ) -> Result<(), IoError> { - if let Some(component) = components.pop() { - let node_name = String::from(component); + block_on( + async { + if let Some(component) = components.pop() { + let node_name = String::from(component); - if let Some(directory) = self.inner.read().get(&node_name) { - return directory.traverse_mkdir(components, mode); - } + if let Some(directory) = self.inner.read().await.get(&node_name) { + return directory.traverse_mkdir(components, mode); + } - if components.is_empty() { - self.inner - .write() - .insert(node_name, Box::new(MemDirectory::new(mode))); - return Ok(()); - } - } + if components.is_empty() { + self.inner + .write() + .await + .insert(node_name, Box::new(MemDirectory::new(mode))); + return Ok(()); + } + } - Err(IoError::EBADF) + Err(IoError::EBADF) + }, + None, + ) } fn traverse_rmdir(&self, components: &mut Vec<&str>) -> Result<(), IoError> { - if let Some(component) = components.pop() { - let node_name = String::from(component); + block_on( + async { + if let Some(component) = components.pop() { + let node_name = String::from(component); - if components.is_empty() { - let mut guard = self.inner.write(); + if let Some(directory) = self.inner.read().await.get(&node_name) { + return directory.traverse_rmdir(components); + } - let obj = guard.remove(&node_name).ok_or(IoError::ENOENT)?; - if obj.get_kind() == NodeKind::Directory { - return Ok(()); - } else { - guard.insert(node_name, obj); - return Err(IoError::ENOTDIR); + if components.is_empty() { + let mut guard = self.inner.write().await; + + let obj = guard.remove(&node_name).ok_or(IoError::ENOENT)?; + if obj.get_kind() == NodeKind::Directory { + return Ok(()); + } else { + guard.insert(node_name, obj); + return Err(IoError::ENOTDIR); + } + } } - } else if let Some(directory) = self.inner.read().get(&node_name) { - return directory.traverse_rmdir(components); - } - } - Err(IoError::EBADF) + Err(IoError::EBADF) + }, + None, + ) } fn traverse_unlink(&self, components: &mut Vec<&str>) -> Result<(), IoError> { - if let Some(component) = components.pop() { - let node_name = String::from(component); + block_on( + async { + if let Some(component) = components.pop() { + let node_name = String::from(component); - if components.is_empty() { - let mut guard = self.inner.write(); + if let Some(directory) = self.inner.read().await.get(&node_name) { + return directory.traverse_unlink(components); + } - let obj = guard.remove(&node_name).ok_or(IoError::ENOENT)?; - if obj.get_kind() == NodeKind::File { - return Ok(()); - } else { - guard.insert(node_name, obj); - return Err(IoError::ENOENT); + if components.is_empty() { + let mut guard = self.inner.write().await; + + let obj = guard.remove(&node_name).ok_or(IoError::ENOENT)?; + if obj.get_kind() == NodeKind::Directory { + guard.insert(node_name, obj); + return Err(IoError::EISDIR); + } else { + return Ok(()); + } + } } - } else if let Some(directory) = self.inner.read().get(&node_name) { - return directory.traverse_unlink(components); - } - } - Err(IoError::EBADF) + Err(IoError::EBADF) + }, + None, + ) } fn traverse_readdir(&self, components: &mut Vec<&str>) -> Result, IoError> { - if let Some(component) = components.pop() { - let node_name = String::from(component); + block_on( + async { + if let Some(component) = components.pop() { + let node_name = String::from(component); - if let Some(directory) = self.inner.read().get(&node_name) { - directory.traverse_readdir(components) - } else { - Err(IoError::EBADF) - } - } else { - let mut entries: Vec = Vec::new(); - for name in self.inner.read().keys() { - entries.push(DirectoryEntry::new(name.to_string())); - } + if let Some(directory) = self.inner.read().await.get(&node_name) { + directory.traverse_readdir(components) + } else { + Err(IoError::EBADF) + } + } else { + let mut entries: Vec = Vec::new(); + for name in self.inner.read().await.keys() { + entries.push(DirectoryEntry::new(name.to_string())); + } - Ok(entries) - } + Ok(entries) + } + }, + None, + ) } fn traverse_lstat(&self, components: &mut Vec<&str>) -> Result { - if let Some(component) = components.pop() { - let node_name = String::from(component); + block_on( + async { + if let Some(component) = components.pop() { + let node_name = String::from(component); + + if components.is_empty() { + if let Some(node) = self.inner.read().await.get(&node_name) { + return node.get_file_attributes(); + } + } - if components.is_empty() { - if let Some(node) = self.inner.read().get(&node_name) { - return node.get_file_attributes(); + if let Some(directory) = self.inner.read().await.get(&node_name) { + directory.traverse_lstat(components) + } else { + Err(IoError::EBADF) + } + } else { + Err(IoError::ENOSYS) } - } - - if let Some(directory) = self.inner.read().get(&node_name) { - directory.traverse_lstat(components) - } else { - Err(IoError::EBADF) - } - } else { - Err(IoError::ENOSYS) - } + }, + None, + ) } fn traverse_stat(&self, components: &mut Vec<&str>) -> Result { - if let Some(component) = components.pop() { - let node_name = String::from(component); + block_on( + async { + if let Some(component) = components.pop() { + let node_name = String::from(component); + + if components.is_empty() { + if let Some(node) = self.inner.read().await.get(&node_name) { + return node.get_file_attributes(); + } + } - if components.is_empty() { - if let Some(node) = self.inner.read().get(&node_name) { - return node.get_file_attributes(); + if let Some(directory) = self.inner.read().await.get(&node_name) { + directory.traverse_stat(components) + } else { + Err(IoError::EBADF) + } + } else { + Err(IoError::ENOSYS) } - } - - if let Some(directory) = self.inner.read().get(&node_name) { - directory.traverse_stat(components) - } else { - Err(IoError::EBADF) - } - } else { - Err(IoError::ENOSYS) - } + }, + None, + ) } fn traverse_mount( @@ -499,20 +572,25 @@ impl VfsNode for MemDirectory { components: &mut Vec<&str>, obj: Box, ) -> Result<(), IoError> { - if let Some(component) = components.pop() { - let node_name = String::from(component); + block_on( + async { + if let Some(component) = components.pop() { + let node_name = String::from(component); - if let Some(directory) = self.inner.read().get(&node_name) { - return directory.traverse_mount(components, obj); - } + if let Some(directory) = self.inner.read().await.get(&node_name) { + return directory.traverse_mount(components, obj); + } - if components.is_empty() { - self.inner.write().insert(node_name, obj); - return Ok(()); - } - } + if components.is_empty() { + self.inner.write().await.insert(node_name, obj); + return Ok(()); + } + } - Err(IoError::EBADF) + Err(IoError::EBADF) + }, + None, + ) } fn traverse_open( @@ -521,36 +599,7 @@ impl VfsNode for MemDirectory { opt: OpenOption, mode: AccessPermission, ) -> Result, IoError> { - if let Some(component) = components.pop() { - let node_name = String::from(component); - - if components.is_empty() { - let mut guard = self.inner.write(); - if opt.contains(OpenOption::O_CREAT) || opt.contains(OpenOption::O_CREAT) { - if guard.get(&node_name).is_some() { - return Err(IoError::EEXIST); - } else { - let file = Box::new(RamFile::new(mode)); - guard.insert(node_name, file.clone()); - return Ok(Arc::new(RamFileInterface::new(file.data.clone()))); - } - } else if let Some(file) = guard.get(&node_name) { - if file.get_kind() == NodeKind::File { - return file.get_object(); - } else { - return Err(IoError::ENOENT); - } - } else { - return Err(IoError::ENOENT); - } - } - - if let Some(directory) = self.inner.read().get(&node_name) { - return directory.traverse_open(components, opt, mode); - } - } - - Err(IoError::ENOENT) + block_on(self.async_traverse_open(components, opt, mode), None) } fn traverse_create_file( @@ -560,20 +609,28 @@ impl VfsNode for MemDirectory { length: usize, mode: AccessPermission, ) -> Result<(), IoError> { - if let Some(component) = components.pop() { - let name = String::from(component); - - if components.is_empty() { - let file = unsafe { RomFile::new(ptr, length, mode) }; - self.inner.write().insert(name.to_string(), Box::new(file)); - return Ok(()); - } + block_on( + async { + if let Some(component) = components.pop() { + let name = String::from(component); + + if components.is_empty() { + let file = unsafe { RomFile::new(ptr, length, mode) }; + self.inner + .write() + .await + .insert(name.to_string(), Box::new(file)); + return Ok(()); + } - if let Some(directory) = self.inner.read().get(&name) { - return directory.traverse_create_file(components, ptr, length, mode); - } - } + if let Some(directory) = self.inner.read().await.get(&name) { + return directory.traverse_create_file(components, ptr, length, mode); + } + } - Err(IoError::ENOENT) + Err(IoError::ENOENT) + }, + None, + ) } } From 799103ca66c5f81b4af0d2c9f1cb2757908498e1 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sat, 3 Feb 2024 08:52:52 +0100 Subject: [PATCH 11/19] using a async_lock to protect the object map This removes a blocking function within the async function `poll_fds`. --- Cargo.lock | 10 ---- Cargo.toml | 1 - src/fd/mod.rs | 152 ++++++++++++++++++++++++++++++-------------------- src/lib.rs | 2 +- 4 files changed, 93 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43c78b0844..1d35894cd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -567,7 +567,6 @@ dependencies = [ "num-traits", "pci-ids", "pci_types", - "pflock", "qemu-exit", "rand_chacha", "riscv", @@ -868,15 +867,6 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" -[[package]] -name = "pflock" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cb30d7f84b469117e8b95025621135ec4b32c8a987e7e40e38d3654e0b2e47e" -dependencies = [ - "lock_api", -] - [[package]] name = "phf" version = "0.11.2" diff --git a/Cargo.toml b/Cargo.toml index 40bfb56724..84a338819d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,6 @@ num-derive = "0.4" num-traits = { version = "0.2", default-features = false } pci-ids = { version = "0.2", optional = true } pci_types = { version = "0.6" } -pflock = "0.2" rand_chacha = { version = "0.3", default-features = false } shell-words = { version = "1.1", default-features = false } smallvec = { version = "1", features = ["const_new"] } diff --git a/src/fd/mod.rs b/src/fd/mod.rs index 20b5ca5bb9..f5900a7cda 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -62,12 +62,13 @@ pub(crate) enum IoCtl { pub(crate) type FileDescriptor = i32; /// Mapping between file descriptor and the referenced object -static OBJECT_MAP: pflock::PFLock, RandomState>> = - pflock::PFLock::new(HashMap::< - FileDescriptor, - Arc, - RandomState, - >::with_hasher(RandomState::with_seeds(0, 0, 0, 0))); +static OBJECT_MAP: async_lock::RwLock< + HashMap, RandomState>, +> = async_lock::RwLock::new(HashMap::< + FileDescriptor, + Arc, + RandomState, +>::with_hasher(RandomState::with_seeds(0, 0, 0, 0))); /// Atomic counter to determine the next unused file descriptor pub(crate) static FD_COUNTER: AtomicI32 = AtomicI32::new(3); @@ -306,11 +307,16 @@ pub(crate) fn open( let fs = fs::FILESYSTEM.get().unwrap(); if let Ok(file) = fs.open(name, flags, mode) { let fd = FD_COUNTER.fetch_add(1, Ordering::SeqCst); - if OBJECT_MAP.write().try_insert(fd, file).is_err() { - Err(IoError::EINVAL) - } else { - Ok(fd as FileDescriptor) - } + block_on( + async { + if OBJECT_MAP.write().await.try_insert(fd, file).is_err() { + Err(IoError::EINVAL) + } else { + Ok(fd as FileDescriptor) + } + }, + None, + ) } else { Err(IoError::EINVAL) } @@ -334,7 +340,8 @@ async fn poll_fds(fds: &mut [PollFd]) -> Result<(), IoError> { for i in &mut *fds { let fd = i.fd; - if let Ok(obj) = get_object(fd) { + let mut pinned_obj = core::pin::pin!(async_get_object(fd)); + if let Ready(Ok(obj)) = pinned_obj.as_mut().poll(cx) { let mut pinned = core::pin::pin!(obj.poll(i.events)); if let Ready(Ok(e)) = pinned.as_mut().poll(cx) { ready = true; @@ -374,14 +381,18 @@ pub(crate) fn poll(fds: &mut [PollFd], timeout: i32) -> Result<(), IoError> { } pub(crate) fn get_object(fd: FileDescriptor) -> Result, IoError> { - Ok((*(OBJECT_MAP.read().get(&fd).ok_or(IoError::EINVAL)?)).clone()) + block_on(async_get_object(fd), None) +} + +async fn async_get_object(fd: FileDescriptor) -> Result, IoError> { + Ok((*(OBJECT_MAP.read().await.get(&fd).ok_or(IoError::EINVAL)?)).clone()) } pub(crate) fn insert_object( fd: FileDescriptor, obj: Arc, ) -> Option> { - OBJECT_MAP.write().insert(fd, obj) + block_on(async { Ok(OBJECT_MAP.write().await.insert(fd, obj)) }, None).unwrap() } // The dup system call allocates a new file descriptor that refers @@ -389,56 +400,77 @@ pub(crate) fn insert_object( // file descriptor number is guaranteed to be the lowest-numbered // file descriptor that was unused in the calling process. pub(crate) fn dup_object(fd: FileDescriptor) -> Result { - let mut guard = OBJECT_MAP.write(); - let obj = (*(guard.get(&fd).ok_or(IoError::EINVAL)?)).clone(); - - let new_fd = || -> i32 { - for i in 3..FD_COUNTER.load(Ordering::SeqCst) { - if !guard.contains_key(&i) { - return i; + block_on( + async { + let mut guard = OBJECT_MAP.write().await; + let obj = (*(guard.get(&fd).ok_or(IoError::EINVAL)?)).clone(); + + let new_fd = || -> i32 { + for i in 3..FD_COUNTER.load(Ordering::SeqCst) { + if !guard.contains_key(&i) { + return i; + } + } + FD_COUNTER.fetch_add(1, Ordering::SeqCst) + }; + + let fd = new_fd(); + if guard.try_insert(fd, obj).is_err() { + Err(IoError::EMFILE) + } else { + Ok(fd as FileDescriptor) } - } - FD_COUNTER.fetch_add(1, Ordering::SeqCst) - }; - - let fd = new_fd(); - if guard.try_insert(fd, obj).is_err() { - Err(IoError::EMFILE) - } else { - Ok(fd as FileDescriptor) - } + }, + None, + ) } pub(crate) fn remove_object(fd: FileDescriptor) -> Result, IoError> { - if fd <= 2 { - Err(IoError::EINVAL) - } else { - let obj = OBJECT_MAP.write().remove(&fd).ok_or(IoError::EINVAL)?; - Ok(obj) - } + block_on( + async { + if fd <= 2 { + Err(IoError::EINVAL) + } else { + let obj = OBJECT_MAP + .write() + .await + .remove(&fd) + .ok_or(IoError::EINVAL)?; + Ok(obj) + } + }, + None, + ) } -pub(crate) fn init() { - let mut guard = OBJECT_MAP.write(); - if env::is_uhyve() { - guard - .try_insert(STDIN_FILENO, Arc::new(UhyveStdin::new())) - .unwrap(); - guard - .try_insert(STDOUT_FILENO, Arc::new(UhyveStdout::new())) - .unwrap(); - guard - .try_insert(STDERR_FILENO, Arc::new(UhyveStderr::new())) - .unwrap(); - } else { - guard - .try_insert(STDIN_FILENO, Arc::new(GenericStdin::new())) - .unwrap(); - guard - .try_insert(STDOUT_FILENO, Arc::new(GenericStdout::new())) - .unwrap(); - guard - .try_insert(STDERR_FILENO, Arc::new(GenericStderr::new())) - .unwrap(); - } +pub(crate) fn init() -> Result<(), IoError> { + block_on( + async { + let mut guard = OBJECT_MAP.write().await; + if env::is_uhyve() { + guard + .try_insert(STDIN_FILENO, Arc::new(UhyveStdin::new())) + .map_err(|_| IoError::EIO)?; + guard + .try_insert(STDOUT_FILENO, Arc::new(UhyveStdout::new())) + .map_err(|_| IoError::EIO)?; + guard + .try_insert(STDERR_FILENO, Arc::new(UhyveStderr::new())) + .map_err(|_| IoError::EIO)?; + } else { + guard + .try_insert(STDIN_FILENO, Arc::new(GenericStdin::new())) + .map_err(|_| IoError::EIO)?; + guard + .try_insert(STDOUT_FILENO, Arc::new(GenericStdout::new())) + .map_err(|_| IoError::EIO)?; + guard + .try_insert(STDERR_FILENO, Arc::new(GenericStderr::new())) + .map_err(|_| IoError::EIO)?; + } + + Ok(()) + }, + None, + ) } diff --git a/src/lib.rs b/src/lib.rs index d9e7980cff..2d5de36e6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -283,7 +283,7 @@ extern "C" fn initd(_arg: usize) { riscv64::kernel::init_drivers(); syscalls::init(); - fd::init(); + fd::init().unwrap(); fs::init(); // Get the application arguments and environment variables. From 96f498b7116f94f1fbd0f49f7a806d5032710180 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 12:38:38 +0100 Subject: [PATCH 12/19] drop guard to reduce the blocking time of the NIC --- src/syscalls/net.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/syscalls/net.rs b/src/syscalls/net.rs index dc33909d5d..3d90bc9856 100644 --- a/src/syscalls/net.rs +++ b/src/syscalls/net.rs @@ -267,6 +267,7 @@ extern "C" fn __sys_socket(domain: i32, type_: i32, protocol: i32) -> i32 { #[cfg(feature = "udp")] if type_ == SOCK_DGRAM { let handle = nic.create_udp_handle().unwrap(); + drop(guard); let socket = udp::Socket::new(handle); insert_object(fd, Arc::new(socket)); @@ -277,7 +278,9 @@ extern "C" fn __sys_socket(domain: i32, type_: i32, protocol: i32) -> i32 { #[cfg(feature = "tcp")] if type_ == SOCK_STREAM { let handle = nic.create_tcp_handle().unwrap(); + drop(guard); let socket = tcp::Socket::new(handle); + insert_object(fd, Arc::new(socket)); return fd; From 03cd8f5b0f545afa21281ed1a8e8c16519fafb37 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 12:39:26 +0100 Subject: [PATCH 13/19] avoiding unneeded blocking in cace, the NIC is already used by a task, the calling of poll function isn't required. --- src/executor/network.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/executor/network.rs b/src/executor/network.rs index 907a6ac5fe..ed8fa1fa19 100644 --- a/src/executor/network.rs +++ b/src/executor/network.rs @@ -87,12 +87,19 @@ pub(crate) fn now() -> Instant { } async fn network_run() { - future::poll_fn(|_cx| match NIC.lock().deref_mut() { - NetworkState::Initialized(nic) => { - nic.poll_common(now()); + future::poll_fn(|_cx| { + if let Some(mut guard) = NIC.try_lock() { + match guard.deref_mut() { + NetworkState::Initialized(nic) => { + nic.poll_common(now()); + Poll::Pending + } + _ => Poll::Ready(()), + } + } else { + // another task is already using the NIC => don't check Poll::Pending } - _ => Poll::Ready(()), }) .await } From db3d8e57d93db7fc574db6ab79d4ebea3c4a4d34 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 12:40:55 +0100 Subject: [PATCH 14/19] improve the readybility and the error messages --- src/fd/mod.rs | 18 ++++++++++++++---- src/lib.rs | 2 +- src/syscalls/net.rs | 8 ++++---- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/fd/mod.rs b/src/fd/mod.rs index f5900a7cda..342a166f66 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -380,19 +380,29 @@ pub(crate) fn poll(fds: &mut [PollFd], timeout: i32) -> Result<(), IoError> { } } +#[inline] +async fn async_get_object(fd: FileDescriptor) -> Result, IoError> { + Ok((*(OBJECT_MAP.read().await.get(&fd).ok_or(IoError::EINVAL)?)).clone()) +} + pub(crate) fn get_object(fd: FileDescriptor) -> Result, IoError> { block_on(async_get_object(fd), None) } -async fn async_get_object(fd: FileDescriptor) -> Result, IoError> { - Ok((*(OBJECT_MAP.read().await.get(&fd).ok_or(IoError::EINVAL)?)).clone()) +#[inline] +async fn async_insert_object( + fd: FileDescriptor, + obj: Arc, +) -> Result<(), IoError> { + let _ = OBJECT_MAP.write().await.insert(fd, obj); + Ok(()) } pub(crate) fn insert_object( fd: FileDescriptor, obj: Arc, -) -> Option> { - block_on(async { Ok(OBJECT_MAP.write().await.insert(fd, obj)) }, None).unwrap() +) -> Result<(), IoError> { + block_on(async_insert_object(fd, obj), None) } // The dup system call allocates a new file descriptor that refers diff --git a/src/lib.rs b/src/lib.rs index 2d5de36e6c..9d388fac70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -283,7 +283,7 @@ extern "C" fn initd(_arg: usize) { riscv64::kernel::init_drivers(); syscalls::init(); - fd::init().unwrap(); + fd::init().expect("Unable to initialized standard file descriptors"); fs::init(); // Get the application arguments and environment variables. diff --git a/src/syscalls/net.rs b/src/syscalls/net.rs index 3d90bc9856..03789f9f85 100644 --- a/src/syscalls/net.rs +++ b/src/syscalls/net.rs @@ -270,7 +270,7 @@ extern "C" fn __sys_socket(domain: i32, type_: i32, protocol: i32) -> i32 { drop(guard); let socket = udp::Socket::new(handle); - insert_object(fd, Arc::new(socket)); + insert_object(fd, Arc::new(socket)).expect("FD is already used"); return fd; } @@ -281,7 +281,7 @@ extern "C" fn __sys_socket(domain: i32, type_: i32, protocol: i32) -> i32 { drop(guard); let socket = tcp::Socket::new(handle); - insert_object(fd, Arc::new(socket)); + insert_object(fd, Arc::new(socket)).expect("FD is already used"); return fd; } @@ -302,9 +302,9 @@ extern "C" fn __sys_accept(fd: i32, addr: *mut sockaddr, addrlen: *mut socklen_t |e| -num::ToPrimitive::to_i32(&e).unwrap(), |endpoint| { let new_obj = dyn_clone::clone_box(&*v); - insert_object(fd, Arc::from(new_obj)); + insert_object(fd, Arc::from(new_obj)).expect("FD is already used"); let new_fd = FD_COUNTER.fetch_add(1, Ordering::SeqCst); - insert_object(new_fd, v.clone()); + insert_object(new_fd, v.clone()).expect("FD is already used"); if !addr.is_null() && !addrlen.is_null() { let addrlen = unsafe { &mut *addrlen }; From c222e8eeab03af562ef2cb61b60f17073e795070 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 16:52:18 +0100 Subject: [PATCH 15/19] remove obsolete unwrapping of the network driver --- src/executor/mod.rs | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/src/executor/mod.rs b/src/executor/mod.rs index f51ecdd4fa..2c97ce7a2a 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -111,12 +111,17 @@ pub(crate) fn poll_on(future: F, timeout: Option) -> Result>, { + #[cfg(any(feature = "tcp", feature = "udp"))] + let nic = get_network_driver(); + // disable network interrupts #[cfg(any(feature = "tcp", feature = "udp"))] - let no_retransmission = { - let mut guard = get_network_driver().unwrap().lock(); + let no_retransmission = if let Some(nic) = nic { + let mut guard = nic.lock(); guard.set_polling_mode(true); guard.get_checksums().tcp.tx() + } else { + true }; let start = now(); @@ -140,7 +145,9 @@ where // allow network interrupts #[cfg(any(feature = "tcp", feature = "udp"))] - get_network_driver().unwrap().lock().set_polling_mode(false); + if let Some(nic) = nic { + nic.lock().set_polling_mode(false); + } return t; } @@ -157,7 +164,9 @@ where // allow network interrupts #[cfg(any(feature = "tcp", feature = "udp"))] - get_network_driver().unwrap().lock().set_polling_mode(false); + if let Some(nic) = nic { + nic.lock().set_polling_mode(false); + } return Err(IoError::ETIME); } @@ -170,12 +179,17 @@ pub(crate) fn block_on(future: F, timeout: Option) -> Result>, { + #[cfg(any(feature = "tcp", feature = "udp"))] + let nic = get_network_driver(); + // disable network interrupts #[cfg(any(feature = "tcp", feature = "udp"))] - let no_retransmission = { - let mut guard = get_network_driver().unwrap().lock(); + let no_retransmission = if let Some(nic) = nic { + let mut guard = nic.lock(); guard.set_polling_mode(true); !guard.get_checksums().tcp.tx() + } else { + true }; let backoff = Backoff::new(); @@ -202,7 +216,9 @@ where // allow network interrupts #[cfg(any(feature = "tcp", feature = "udp"))] - get_network_driver().unwrap().lock().set_polling_mode(false); + if let Some(nic) = nic { + nic.lock().set_polling_mode(false); + } return t; } @@ -219,7 +235,9 @@ where // allow network interrupts #[cfg(any(feature = "tcp", feature = "udp"))] - get_network_driver().unwrap().lock().set_polling_mode(false); + if let Some(nic) = nic { + nic.lock().set_polling_mode(false); + } return Err(IoError::ETIME); } @@ -240,13 +258,17 @@ where } // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); + if let Some(nic) = nic { + nic.lock().set_polling_mode(false); + } // switch to another task task_notify.wait(wakeup_time); // restore default values - get_network_driver().unwrap().lock().set_polling_mode(true); + if let Some(nic) = nic { + nic.lock().set_polling_mode(true); + } backoff.reset(); } else { backoff.snooze(); From afb7ca8404f48ab7a4240e1cf97e9826a31ba39e Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 19:32:08 +0100 Subject: [PATCH 16/19] replace read/write by async_read/write - using async_locks to reduce the blocking time --- src/fd/mod.rs | 58 ++++++++++++++++++----- src/fd/socket/tcp.rs | 42 ++--------------- src/fd/socket/udp.rs | 109 +++++++++++++++++-------------------------- src/fd/stdio.rs | 8 ++-- src/fs/uhyve.rs | 19 ++++---- src/syscalls/mod.rs | 20 ++------ src/syscalls/net.rs | 10 +--- 7 files changed, 112 insertions(+), 154 deletions(-) diff --git a/src/fd/mod.rs b/src/fd/mod.rs index 342a166f66..6647ec7d2f 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -161,22 +161,16 @@ pub(crate) trait ObjectInterface: Sync + Send + core::fmt::Debug + DynClone { Err(IoError::ENOSYS) } - /// `read` attempts to read `len` bytes from the object references - /// by the descriptor - fn read(&self, buf: &mut [u8]) -> Result { - block_on(self.async_read(buf), None) - } - /// `async_write` attempts to write `len` bytes to the object references /// by the descriptor async fn async_write(&self, _buf: &[u8]) -> Result { Err(IoError::ENOSYS) } - /// `write` attempts to write `len` bytes to the object references - /// by the descriptor - fn write(&self, buf: &[u8]) -> Result { - block_on(self.async_write(buf), None) + /// `is_nonblocking` returns `true`, if `read`, `write`, `recv` and send operations + /// don't block. + fn is_nonblocking(&self) -> bool { + false } /// `lseek` function repositions the offset of the file descriptor fildes @@ -327,11 +321,51 @@ pub(crate) fn close(fd: FileDescriptor) { } pub(crate) fn read(fd: FileDescriptor, buf: &mut [u8]) -> Result { - get_object(fd)?.read(buf) + let obj = get_object(fd)?; + + if buf.is_empty() { + return Ok(0); + } + + if obj.is_nonblocking() { + poll_on(obj.async_read(buf), Some(Duration::ZERO.into())).map_err(|x| { + if x == IoError::ETIME { + IoError::EAGAIN + } else { + x + } + }) + } else { + match poll_on(obj.async_read(buf), Some(Duration::from_secs(2).into())) { + Err(IoError::ETIME) => block_on(obj.async_read(buf), None), + Err(x) => Err(x), + Ok(x) => Ok(x), + } + } } pub(crate) fn write(fd: FileDescriptor, buf: &[u8]) -> Result { - get_object(fd)?.write(buf) + let obj = get_object(fd)?; + + if buf.is_empty() { + return Ok(0); + } + + if obj.is_nonblocking() { + poll_on(obj.async_write(buf), Some(Duration::ZERO.into())).map_err(|x| { + if x == IoError::ETIME { + IoError::EAGAIN + } else { + x + } + }) + } else { + match poll_on(obj.async_write(buf), Some(Duration::from_secs(2).into())) { + Err(IoError::ETIME) => block_on(obj.async_write(buf), None), + Err(x) => Err(x), + Ok(x) => Ok(x), + } + } } async fn poll_fds(fds: &mut [PollFd]) -> Result<(), IoError> { diff --git a/src/fd/socket/tcp.rs b/src/fd/socket/tcp.rs index 3b089b9f73..b9c8da9257 100644 --- a/src/fd/socket/tcp.rs +++ b/src/fd/socket/tcp.rs @@ -10,8 +10,8 @@ use smoltcp::socket::tcp; use smoltcp::time::Duration; use smoltcp::wire::{IpEndpoint, IpListenEndpoint}; +use crate::executor::block_on; use crate::executor::network::{now, Handle, NetworkState, NIC}; -use crate::executor::{block_on, poll_on}; use crate::fd::{IoCtl, IoError, ObjectInterface, PollEvent, SocketOption}; use crate::syscalls::net::*; use crate::DEFAULT_KEEP_ALIVE_INTERVAL; @@ -321,44 +321,8 @@ impl ObjectInterface for Socket { self.with(|socket| socket.local_endpoint()) } - fn read(&self, buf: &mut [u8]) -> Result { - if buf.is_empty() { - return Ok(0); - } - - if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_read(buf), Some(Duration::ZERO.into())).map_err(|x| { - if x == IoError::ETIME { - IoError::EAGAIN - } else { - x - } - }) - } else { - match poll_on(self.async_read(buf), Some(Duration::from_secs(2).into())) { - Err(IoError::ETIME) => block_on(self.async_read(buf), None), - Err(x) => Err(x), - Ok(x) => Ok(x), - } - } - } - - fn write(&self, buf: &[u8]) -> Result { - if buf.is_empty() { - return Ok(0); - } - - if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_write(buf), Some(Duration::ZERO.into())).map_err(|x| { - if x == IoError::ETIME { - IoError::EAGAIN - } else { - x - } - }) - } else { - poll_on(self.async_write(buf), None) - } + fn is_nonblocking(&self) -> bool { + self.nonblocking.load(Ordering::Acquire) } fn listen(&self, _backlog: i32) -> Result<(), IoError> { diff --git a/src/fd/socket/udp.rs b/src/fd/socket/udp.rs index 67055f6221..b4940546f4 100644 --- a/src/fd/socket/udp.rs +++ b/src/fd/socket/udp.rs @@ -56,38 +56,6 @@ impl Socket { .await } - async fn async_read(&self, buffer: &mut [u8]) -> Result { - future::poll_fn(|cx| { - self.with(|socket| { - if socket.is_open() { - if socket.can_recv() { - match socket.recv_slice(buffer) { - Ok((len, meta)) => match self.endpoint.load() { - Some(ep) => { - if meta.endpoint == ep { - Poll::Ready(Ok(len)) - } else { - buffer[..len].iter_mut().for_each(|x| *x = 0); - socket.register_recv_waker(cx.waker()); - Poll::Pending - } - } - None => Poll::Ready(Ok(len)), - }, - _ => Poll::Ready(Err(IoError::EIO)), - } - } else { - socket.register_recv_waker(cx.waker()); - Poll::Pending - } - } else { - Poll::Ready(Err(IoError::EIO)) - } - }) - }) - .await - } - async fn async_recvfrom(&self, buffer: &mut [u8]) -> Result<(usize, IpEndpoint), IoError> { future::poll_fn(|cx| { self.with(|socket| { @@ -120,7 +88,11 @@ impl Socket { .await } - async fn async_write(&self, buffer: &[u8], meta: &UdpMetadata) -> Result { + async fn async_write_with_meta( + &self, + buffer: &[u8], + meta: &UdpMetadata, + ) -> Result { future::poll_fn(|cx| { self.with(|socket| { if socket.is_open() { @@ -199,9 +171,12 @@ impl ObjectInterface for Socket { let meta = UdpMetadata::from(endpoint); if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_write(buf, &meta), Some(Duration::ZERO.into())) + poll_on( + self.async_write_with_meta(buf, &meta), + Some(Duration::ZERO.into()), + ) } else { - poll_on(self.async_write(buf, &meta), None) + poll_on(self.async_write_with_meta(buf, &meta), None) } } @@ -226,44 +201,44 @@ impl ObjectInterface for Socket { } } - fn read(&self, buf: &mut [u8]) -> Result { - if buf.len() == 0 { - return Ok(0); - } - - if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_read(buf), Some(Duration::ZERO.into())).map_err(|x| { - if x == IoError::ETIME { - IoError::EAGAIN + async fn async_read(&self, buffer: &mut [u8]) -> Result { + future::poll_fn(|cx| { + self.with(|socket| { + if socket.is_open() { + if socket.can_recv() { + match socket.recv_slice(buffer) { + Ok((len, meta)) => match self.endpoint.load() { + Some(ep) => { + if meta.endpoint == ep { + Poll::Ready(Ok(len)) + } else { + buffer[..len].iter_mut().for_each(|x| *x = 0); + socket.register_recv_waker(cx.waker()); + Poll::Pending + } + } + None => Poll::Ready(Ok(len)), + }, + _ => Poll::Ready(Err(IoError::EIO)), + } + } else { + socket.register_recv_waker(cx.waker()); + Poll::Pending + } } else { - x + Poll::Ready(Err(IoError::EIO)) } }) - } else { - match poll_on(self.async_read(buf), Some(Duration::from_secs(2).into())) { - Err(IoError::ETIME) => block_on(self.async_read(buf), None), - Err(x) => Err(x), - Ok(x) => Ok(x), - } - } + }) + .await } - fn write(&self, buf: &[u8]) -> Result { - if buf.len() == 0 { - return Ok(0); - } - - let endpoint = self.endpoint.load(); - if endpoint.is_none() { - return Err(IoError::EINVAL); - } - - let meta = UdpMetadata::from(endpoint.unwrap()); - - if self.nonblocking.load(Ordering::Acquire) { - poll_on(self.async_write(buf, &meta), Some(Duration::ZERO.into())) + async fn async_write(&self, buf: &[u8]) -> Result { + if let Some(endpoint) = self.endpoint.load() { + let meta = UdpMetadata::from(endpoint); + self.async_write_with_meta(buf, &meta).await } else { - poll_on(self.async_write(buf, &meta), None) + Err(IoError::EINVAL) } } diff --git a/src/fd/stdio.rs b/src/fd/stdio.rs index 55fa67ae8f..f2940493ca 100644 --- a/src/fd/stdio.rs +++ b/src/fd/stdio.rs @@ -94,7 +94,7 @@ impl ObjectInterface for GenericStdout { Ok(result) } - fn write(&self, buf: &[u8]) -> Result { + async fn async_write(&self, buf: &[u8]) -> Result { // stdin/err/out all go to console CONSOLE.lock().write_all(buf); @@ -127,7 +127,7 @@ impl ObjectInterface for GenericStderr { Ok(result) } - fn write(&self, buf: &[u8]) -> Result { + async fn async_write(&self, buf: &[u8]) -> Result { // stdin/err/out all go to console CONSOLE.lock().write_all(buf); @@ -171,7 +171,7 @@ impl ObjectInterface for UhyveStdout { Ok(result) } - fn write(&self, buf: &[u8]) -> Result { + async fn async_write(&self, buf: &[u8]) -> Result { let mut syswrite = SysWrite::new(STDOUT_FILENO, buf.as_ptr(), buf.len()); uhyve_send(UHYVE_PORT_WRITE, &mut syswrite); @@ -204,7 +204,7 @@ impl ObjectInterface for UhyveStderr { Ok(result) } - fn write(&self, buf: &[u8]) -> Result { + async fn async_write(&self, buf: &[u8]) -> Result { let mut syswrite = SysWrite::new(STDERR_FILENO, buf.as_ptr(), buf.len()); uhyve_send(UHYVE_PORT_WRITE, &mut syswrite); diff --git a/src/fs/uhyve.rs b/src/fs/uhyve.rs index 12b624b097..213a85ca55 100644 --- a/src/fs/uhyve.rs +++ b/src/fs/uhyve.rs @@ -6,12 +6,14 @@ use alloc::vec::Vec; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use core::ptr; -use hermit_sync::SpinMutex; +use async_lock::Mutex; +use async_trait::async_trait; #[cfg(target_arch = "x86_64")] use x86::io::outl; use crate::arch::mm::{paging, PhysAddr, VirtAddr}; use crate::env::is_uhyve; +use crate::executor::block_on; use crate::fd::IoError; use crate::fs::{ self, AccessPermission, FileAttr, NodeKind, ObjectInterface, OpenOption, SeekWhence, VfsNode, @@ -201,25 +203,26 @@ impl Drop for UhyveFileHandleInner { } #[derive(Debug)] -struct UhyveFileHandle(pub Arc>); +struct UhyveFileHandle(pub Arc>); impl UhyveFileHandle { pub fn new(fd: i32) -> Self { - Self(Arc::new(SpinMutex::new(UhyveFileHandleInner::new(fd)))) + Self(Arc::new(Mutex::new(UhyveFileHandleInner::new(fd)))) } } +#[async_trait] impl ObjectInterface for UhyveFileHandle { - fn read(&self, buf: &mut [u8]) -> Result { - self.0.lock().read(buf) + async fn async_read(&self, buf: &mut [u8]) -> Result { + self.0.lock().await.read(buf) } - fn write(&self, buf: &[u8]) -> Result { - self.0.lock().write(buf) + async fn async_write(&self, buf: &[u8]) -> Result { + self.0.lock().await.write(buf) } fn lseek(&self, offset: isize, whence: SeekWhence) -> Result { - self.0.lock().lseek(offset, whence) + block_on(async { self.0.lock().await.lseek(offset, whence) }, None) } } diff --git a/src/syscalls/mod.rs b/src/syscalls/mod.rs index e9bf260c9d..d6ab08beaa 100644 --- a/src/syscalls/mod.rs +++ b/src/syscalls/mod.rs @@ -261,15 +261,9 @@ pub extern "C" fn sys_close(fd: FileDescriptor) -> i32 { extern "C" fn __sys_read(fd: FileDescriptor, buf: *mut u8, len: usize) -> isize { let slice = unsafe { core::slice::from_raw_parts_mut(buf, len) }; - let obj = get_object(fd); - obj.map_or_else( + crate::fd::read(fd, slice).map_or_else( |e| -num::ToPrimitive::to_isize(&e).unwrap(), - |v| { - (*v).read(slice).map_or_else( - |e| -num::ToPrimitive::to_isize(&e).unwrap(), - |v| v.try_into().unwrap(), - ) - }, + |v| v.try_into().unwrap(), ) } @@ -280,15 +274,9 @@ pub extern "C" fn sys_read(fd: FileDescriptor, buf: *mut u8, len: usize) -> isiz extern "C" fn __sys_write(fd: FileDescriptor, buf: *const u8, len: usize) -> isize { let slice = unsafe { core::slice::from_raw_parts(buf, len) }; - let obj = get_object(fd); - obj.map_or_else( + crate::fd::write(fd, slice).map_or_else( |e| -num::ToPrimitive::to_isize(&e).unwrap(), - |v| { - (*v).write(slice).map_or_else( - |e| -num::ToPrimitive::to_isize(&e).unwrap(), - |v| v.try_into().unwrap(), - ) - }, + |v| v.try_into().unwrap(), ) } diff --git a/src/syscalls/net.rs b/src/syscalls/net.rs index 03789f9f85..043dc47e00 100644 --- a/src/syscalls/net.rs +++ b/src/syscalls/net.rs @@ -564,15 +564,9 @@ extern "C" fn __sys_shutdown_socket(fd: i32, how: i32) -> i32 { extern "C" fn __sys_recv(fd: i32, buf: *mut u8, len: usize) -> isize { let slice = unsafe { core::slice::from_raw_parts_mut(buf, len) }; - let obj = get_object(fd); - obj.map_or_else( + crate::fd::read(fd, slice).map_or_else( |e| -num::ToPrimitive::to_isize(&e).unwrap(), - |v| { - (*v).read(slice).map_or_else( - |e| -num::ToPrimitive::to_isize(&e).unwrap(), - |v| v.try_into().unwrap(), - ) - }, + |v| v.try_into().unwrap(), ) } From 24a3b2217947f682b59a339e3730078311a5f1c7 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 19:37:36 +0100 Subject: [PATCH 17/19] remove clippy warnings --- src/fd/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/fd/mod.rs b/src/fd/mod.rs index 6647ec7d2f..ac6fe6b8b6 100644 --- a/src/fd/mod.rs +++ b/src/fd/mod.rs @@ -328,7 +328,7 @@ pub(crate) fn read(fd: FileDescriptor, buf: &mut [u8]) -> Result } if obj.is_nonblocking() { - poll_on(obj.async_read(buf), Some(Duration::ZERO.into())).map_err(|x| { + poll_on(obj.async_read(buf), Some(Duration::ZERO)).map_err(|x| { if x == IoError::ETIME { IoError::EAGAIN } else { @@ -336,7 +336,7 @@ pub(crate) fn read(fd: FileDescriptor, buf: &mut [u8]) -> Result } }) } else { - match poll_on(obj.async_read(buf), Some(Duration::from_secs(2).into())) { + match poll_on(obj.async_read(buf), Some(Duration::from_secs(2))) { Err(IoError::ETIME) => block_on(obj.async_read(buf), None), Err(x) => Err(x), Ok(x) => Ok(x), @@ -352,7 +352,7 @@ pub(crate) fn write(fd: FileDescriptor, buf: &[u8]) -> Result { } if obj.is_nonblocking() { - poll_on(obj.async_write(buf), Some(Duration::ZERO.into())).map_err(|x| { + poll_on(obj.async_write(buf), Some(Duration::ZERO)).map_err(|x| { if x == IoError::ETIME { IoError::EAGAIN } else { @@ -360,7 +360,7 @@ pub(crate) fn write(fd: FileDescriptor, buf: &[u8]) -> Result { } }) } else { - match poll_on(obj.async_write(buf), Some(Duration::from_secs(2).into())) { + match poll_on(obj.async_write(buf), Some(Duration::from_secs(2))) { Err(IoError::ETIME) => block_on(obj.async_write(buf), None), Err(x) => Err(x), Ok(x) => Ok(x), From 841c2c15beaa6fe0c2d1298c2d823a47f19b53e3 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 20:17:58 +0100 Subject: [PATCH 18/19] introduce an async lock for stdin and stderr --- src/console.rs | 18 ++++++++++-------- src/fd/stdio.rs | 13 ++++++++----- src/lib.rs | 8 +------- src/macros.rs | 4 ++-- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/src/console.rs b/src/console.rs index 2d51bcd6e3..f22fe354bf 100644 --- a/src/console.rs +++ b/src/console.rs @@ -4,7 +4,7 @@ use hermit_sync::InterruptTicketMutex; use crate::arch; -pub struct Console(()); +pub(crate) struct Console(()); /// A collection of methods that are required to format /// a message to Hermit's console. @@ -21,14 +21,16 @@ impl fmt::Write for Console { } } -impl Console { - #[inline] - pub fn write_all(&mut self, buf: &[u8]) { - arch::output_message_buf(buf) - } -} - +#[cfg(feature = "newlib")] pub static CONSOLE: InterruptTicketMutex = InterruptTicketMutex::new(Console(())); +#[cfg(not(feature = "newlib"))] +static CONSOLE: InterruptTicketMutex = InterruptTicketMutex::new(Console(())); + +#[doc(hidden)] +pub fn _print(args: ::core::fmt::Arguments<'_>) { + use core::fmt::Write; + CONSOLE.lock().write_fmt(args).unwrap(); +} #[cfg(all(test, not(target_os = "none")))] mod tests { diff --git a/src/fd/stdio.rs b/src/fd/stdio.rs index f2940493ca..ba84177bae 100644 --- a/src/fd/stdio.rs +++ b/src/fd/stdio.rs @@ -2,17 +2,20 @@ use alloc::boxed::Box; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use core::ptr; +use async_lock::Mutex; use async_trait::async_trait; #[cfg(target_arch = "x86_64")] use x86::io::*; +use crate::arch; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use crate::arch::mm::{paging, VirtAddr}; -use crate::console::CONSOLE; use crate::fd::{IoError, ObjectInterface, PollEvent, STDERR_FILENO, STDOUT_FILENO}; const UHYVE_PORT_WRITE: u16 = 0x400; +static IO_LOCK: Mutex<()> = Mutex::new(()); + #[repr(C, packed)] struct SysWrite { fd: i32, @@ -95,8 +98,8 @@ impl ObjectInterface for GenericStdout { } async fn async_write(&self, buf: &[u8]) -> Result { - // stdin/err/out all go to console - CONSOLE.lock().write_all(buf); + let _guard = IO_LOCK.lock().await; + arch::output_message_buf(buf); Ok(buf.len()) } @@ -128,8 +131,8 @@ impl ObjectInterface for GenericStderr { } async fn async_write(&self, buf: &[u8]) -> Result { - // stdin/err/out all go to console - CONSOLE.lock().write_all(buf); + let _guard = IO_LOCK.lock().await; + arch::output_message_buf(buf); Ok(buf.len()) } diff --git a/src/lib.rs b/src/lib.rs index 9d388fac70..b3190aa5f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,7 +77,7 @@ mod logging; mod arch; mod config; -mod console; +pub mod console; mod drivers; mod entropy; mod env; @@ -95,12 +95,6 @@ pub mod time; #[cfg(target_os = "none")] hermit_entry::define_entry_version!(); -#[doc(hidden)] -pub fn _print(args: ::core::fmt::Arguments<'_>) { - use core::fmt::Write; - crate::console::CONSOLE.lock().write_fmt(args).unwrap(); -} - #[cfg(test)] #[cfg(target_os = "none")] #[no_mangle] diff --git a/src/macros.rs b/src/macros.rs index c127bab229..4dca430372 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -7,7 +7,7 @@ #[macro_export] macro_rules! print { ($($arg:tt)*) => {{ - $crate::_print(::core::format_args!($($arg)*)); + $crate::console::_print(::core::format_args!($($arg)*)); }}; } @@ -23,7 +23,7 @@ macro_rules! println { $crate::print!("\n") }; ($($arg:tt)*) => {{ - $crate::_print(::core::format_args!("{}\n", format_args!($($arg)*))); + $crate::console::_print(::core::format_args!("{}\n", format_args!($($arg)*))); }}; } From 5dbbb03c2492ea3f1cb3b2b41cbc7527cab71372 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Sun, 4 Feb 2024 23:57:24 +0100 Subject: [PATCH 19/19] remove merge conflicts --- src/fs/mem.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/fs/mem.rs b/src/fs/mem.rs index eda319101c..601039a154 100644 --- a/src/fs/mem.rs +++ b/src/fs/mem.rs @@ -441,10 +441,6 @@ impl VfsNode for MemDirectory { if let Some(component) = components.pop() { let node_name = String::from(component); - if let Some(directory) = self.inner.read().await.get(&node_name) { - return directory.traverse_rmdir(components); - } - if components.is_empty() { let mut guard = self.inner.write().await; @@ -455,6 +451,8 @@ impl VfsNode for MemDirectory { guard.insert(node_name, obj); return Err(IoError::ENOTDIR); } + } else if let Some(directory) = self.inner.read().await.get(&node_name) { + return directory.traverse_rmdir(components); } } @@ -470,20 +468,18 @@ impl VfsNode for MemDirectory { if let Some(component) = components.pop() { let node_name = String::from(component); - if let Some(directory) = self.inner.read().await.get(&node_name) { - return directory.traverse_unlink(components); - } - if components.is_empty() { let mut guard = self.inner.write().await; let obj = guard.remove(&node_name).ok_or(IoError::ENOENT)?; - if obj.get_kind() == NodeKind::Directory { + if obj.get_kind() == NodeKind::File { + return Ok(()); + } else { guard.insert(node_name, obj); return Err(IoError::EISDIR); - } else { - return Ok(()); } + } else if let Some(directory) = self.inner.read().await.get(&node_name) { + return directory.traverse_unlink(components); } }