Skip to content

Commit

Permalink
virtqueue: do not return Transfers to dispatchers
Browse files Browse the repository at this point in the history
Sharing the transfer with the sender is not really necessary, they
can be notified through a await queue. Change the relevant methods to
let the virtqueue own the Transfers and avoid reference counting
complications.
  • Loading branch information
cagatay-y committed Jan 2, 2024
1 parent 8228d21 commit 293703f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 131 deletions.
59 changes: 24 additions & 35 deletions src/drivers/virtio/virtqueue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ impl Virtq {
/// The `notif` parameter indicates if the driver wants to have a notification for this specific
/// transfer. This is only for performance optimization. As it is NOT ensured, that the device sees the
/// updated notification flags before finishing transfers!
fn dispatch(&self, tkn: TransferToken, notif: bool) -> Transfer {
fn dispatch(&self, tkn: TransferToken, notif: bool) {
match self {
Virtq::Packed(vq) => vq.dispatch(tkn, notif),
Virtq::Split(vq) => vq.dispatch(tkn, notif),
}
};
}
}

Expand Down Expand Up @@ -225,7 +225,7 @@ impl Virtq {
/// The `notif` parameter indicates if the driver wants to have a notification for this specific
/// transfer. This is only for performance optimization. As it is NOT ensured, that the device sees the
/// updated notification flags before finishing transfers!
pub fn dispatch_batch(tkns: Vec<TransferToken>, notif: bool) -> Vec<Transfer> {
pub fn dispatch_batch(tkns: Vec<TransferToken>, notif: bool) {
let mut used_vqs: Vec<(Rc<Virtq>, Vec<TransferToken>)> = Vec::new();

// Sort the TransferTokens depending in the queue their coming from.
Expand Down Expand Up @@ -255,17 +255,12 @@ impl Virtq {
}
}

let mut transfer_lst = Vec::new();
for (vq_ref, tkn_lst) in used_vqs {
match vq_ref.as_ref() {
Virtq::Packed(vq) => {
transfer_lst.append(vq.dispatch_batch(tkn_lst, notif).as_mut())
}
Virtq::Split(vq) => transfer_lst.append(vq.dispatch_batch(tkn_lst, notif).as_mut()),
Virtq::Packed(vq) => vq.dispatch_batch(tkn_lst, notif),
Virtq::Split(vq) => vq.dispatch_batch(tkn_lst, notif),
}
}

transfer_lst
}

/// Dispatches a batch of TransferTokens. The Transfers will be placed in to the `await_queue`
Expand Down Expand Up @@ -566,7 +561,7 @@ pub trait AsSliceU8 {
pub struct Transfer {
/// Needs to be `Option<Pinned<TransferToken>>` in order to prevent deallocation via None
// See custom drop function for clarity
transfer_tkn: Option<Rc<TransferToken>>,
transfer_tkn: Option<Box<TransferToken>>,
}

// Public Interface of Transfer
Expand Down Expand Up @@ -670,12 +665,13 @@ impl Transfer {
send_buff,
recv_buff,
..
} = unsafe {
Rc::get_mut_unchecked(self.transfer_tkn.as_mut().unwrap())
.buff_tkn
.as_mut()
.unwrap()
};
} = self
.transfer_tkn
.as_mut()
.unwrap()
.buff_tkn
.as_mut()
.unwrap();

(send_buff.as_mut(), recv_buff.as_mut())
};
Expand Down Expand Up @@ -835,8 +831,7 @@ impl Transfer {
match state {
TransferState::Finished => {
// Desctructure Token
let mut transfer_tkn = Rc::into_inner(self.transfer_tkn.take().unwrap())
.expect("There should not have been other references to the token.");
let mut transfer_tkn = self.transfer_tkn.take().unwrap();

let mut buffer_tkn = transfer_tkn.buff_tkn.take().unwrap();

Expand Down Expand Up @@ -904,11 +899,7 @@ impl Transfer {
.unwrap()
.reusable
{
let tkn = Rc::into_inner(self.transfer_tkn.take().unwrap())
.expect("There should not have been other references to the token")
.buff_tkn
.take()
.unwrap();
let tkn = self.transfer_tkn.take().unwrap().buff_tkn.take().unwrap();

Ok(tkn.reset())
} else {
Expand Down Expand Up @@ -941,11 +932,7 @@ impl Transfer {
.unwrap()
.reusable
{
let tkn = Rc::into_inner(self.transfer_tkn.take().unwrap())
.expect("There should not have been other references to the token")
.buff_tkn
.take()
.unwrap();
let tkn = self.transfer_tkn.take().unwrap().buff_tkn.take().unwrap();

Ok(tkn.reset_purge())
} else {
Expand Down Expand Up @@ -1009,8 +996,8 @@ impl TransferToken {
/// The `notif` parameter indicates if the driver wants to have a notification for this specific
/// transfer. This is only for performance optimization. As it is NOT ensured, that the device sees the
/// updated notification flags before finishing transfers!
pub fn dispatch(self, notif: bool) -> Transfer {
self.get_vq().dispatch(self, notif)
pub fn dispatch(self, notif: bool) {
self.get_vq().dispatch(self, notif);
}

/// Dispatches the provided TransferToken to the respectuve queue and does
Expand All @@ -1025,18 +1012,20 @@ impl TransferToken {
/// Upon finish notifications are enabled again.
pub fn dispatch_blocking(self) -> Result<Transfer, VirtqError> {
let vq = self.get_vq();
let transfer = self.get_vq().dispatch(self, false);
let rcv_queue = Rc::new(RefCell::new(VecDeque::with_capacity(1)));
self.dispatch_await(rcv_queue.clone(), false);

vq.disable_notifs();

while transfer.transfer_tkn.as_ref().unwrap().state != TransferState::Finished {
// Keep Spinning until the state changes to Finished
while rcv_queue.borrow().is_empty() {
// Keep Spinning until the receive queue is filled
vq.poll()
}

vq.enable_notifs();

Ok(transfer)
let result = Ok(rcv_queue.borrow_mut().pop_front().unwrap());
result
}
}

Expand Down
84 changes: 28 additions & 56 deletions src/drivers/virtio/virtqueue/packed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use alloc::collections::VecDeque;
use alloc::rc::Rc;
use alloc::vec::Vec;
use core::cell::RefCell;
use core::mem::MaybeUninit;
use core::ptr;
use core::sync::atomic::{fence, Ordering};

Expand Down Expand Up @@ -88,7 +89,7 @@ impl WrapCount {
struct DescriptorRing {
ring: &'static mut [Descriptor],
//ring: Pinned<Vec<Descriptor>>,
tkn_ref_ring: Box<[Option<Rc<TransferToken>>]>,
tkn_ref_ring: Box<[MaybeUninit<Box<TransferToken>>]>,

// Controlling variables for the ring
//
Expand Down Expand Up @@ -117,7 +118,7 @@ impl DescriptorRing {
// Descriptor ID's run from 1 to size_of_queue. In order to index directly into the
// reference ring via an ID it is much easier to simply have an array of size = size_of_queue + 1
// and do not care about the first element being unused.
let tkn_ref_ring = vec![None; size + 1].into_boxed_slice();
let tkn_ref_ring = Box::new_uninit_slice(size + 1);

DescriptorRing {
ring,
Expand All @@ -142,26 +143,23 @@ impl DescriptorRing {
// the dereferencing operation undefined behaviour as also
// all operations on the reference.

unsafe {
let tkn_ref = Rc::get_mut_unchecked(&mut tkn);
tkn_ref.state = TransferState::Finished;
if let Some(queue) = tkn_ref.await_queue.take() {
// Turn the raw pointer into a Pinned again, which will hold ownership of the Token
queue.borrow_mut().push_back(Transfer {
transfer_tkn: Some(tkn),
});
}
tkn.state = TransferState::Finished;
if let Some(queue) = tkn.await_queue.take() {
// Turn the raw pointer into a Pinned again, which will hold ownership of the Token
queue.borrow_mut().push_back(Transfer {
transfer_tkn: Some(tkn),
});
}
}
}

fn push_batch(&mut self, tkn_lst: Vec<TransferToken>) -> (Vec<Rc<TransferToken>>, usize, u8) {
fn push_batch(&mut self, tkn_lst: Vec<TransferToken>) -> (usize, u8) {
// Catch empty push, in order to allow zero initialized first_ctrl_settings struct
// which will be overwritten in the first iteration of the for-loop
assert!(!tkn_lst.is_empty());

let mut first_ctrl_settings: (usize, u16, WrapCount) = (0, 0, WrapCount::new());
let mut pind_lst = Vec::with_capacity(tkn_lst.len());
let mut first_buffer = MaybeUninit::uninit();

for (i, mut tkn) in tkn_lst.into_iter().enumerate() {
// Check length and if its fits. This should always be true due to the restriction of
Expand Down Expand Up @@ -276,36 +274,29 @@ impl DescriptorRing {
// Update the state of the actual Token
tkn.state = TransferState::Processing;

let tkn = Rc::new(tkn);

if i == 0 {
first_ctrl_settings = (ctrl.start, ctrl.buff_id, ctrl.wrap_at_init);
first_buffer = MaybeUninit::new(Box::new(tkn));
} else {
// Update flags of the first descriptor and set new write_index
ctrl.make_avail(tkn.clone());
ctrl.make_avail(Box::new(tkn));
}

pind_lst.push(tkn.clone());
}
// Manually make the first buffer available lastly
//
// Providing the first buffer in the list manually
// provide reference, in order to let TransferToken now upon finish.
self.tkn_ref_ring[usize::from(first_ctrl_settings.1)] = Some(pind_lst[0].clone());
self.tkn_ref_ring[usize::from(first_ctrl_settings.1)] = first_buffer;
// The driver performs a suitable memory barrier to ensure the device sees the updated descriptor table and available ring before the next step.
// See Virtio specfification v1.1. - 2.7.21
fence(Ordering::SeqCst);
self.ring[first_ctrl_settings.0].flags |= first_ctrl_settings.2.as_flags_avail();

// Converting a boolean as u8 is fine
(
pind_lst,
first_ctrl_settings.0,
first_ctrl_settings.2 .0 as u8,
)
(first_ctrl_settings.0, first_ctrl_settings.2 .0 as u8)
}

fn push(&mut self, mut tkn: TransferToken) -> (Rc<TransferToken>, usize, u8) {
fn push(&mut self, mut tkn: TransferToken) -> (usize, u8) {
// Check length and if its fits. This should always be true due to the restriction of
// the memory pool, but to be sure.
assert!(tkn.buff_tkn.as_ref().unwrap().num_consuming_descr() <= self.capacity);
Expand Down Expand Up @@ -412,16 +403,13 @@ impl DescriptorRing {
// Update the state of the actual Token
tkn.state = TransferState::Processing;

fence(Ordering::SeqCst);
let tkn = Rc::new(tkn);

fence(Ordering::SeqCst);
// Update flags of the first descriptor and set new write_index
ctrl.make_avail(tkn.clone());
ctrl.make_avail(Box::new(tkn));
fence(Ordering::SeqCst);

// Converting a boolean as u8 is fine
(tkn, ctrl.start, ctrl.wrap_at_init.0 as u8)
(ctrl.start, ctrl.wrap_at_init.0 as u8)
}

/// # Unsafe
Expand Down Expand Up @@ -467,22 +455,20 @@ struct ReadCtrl<'a> {
impl<'a> ReadCtrl<'a> {
/// Polls the ring for a new finished buffer. If buffer is marked as used, takes care of
/// updating the queue and returns the respective TransferToken.
fn poll_next(&mut self) -> Option<Rc<TransferToken>> {
fn poll_next(&mut self) -> Option<Box<TransferToken>> {
// Check if descriptor has been marked used.
if self.desc_ring.ring[self.position].flags & WrapCount::flag_mask()
== self.desc_ring.dev_wc.as_flags_used()
{
let buff_id = usize::from(self.desc_ring.ring[self.position].buff_id);
let mut tkn = self.desc_ring.tkn_ref_ring[buff_id]
.take()
.expect("TransferToken at position does not exist");
let mut tkn = unsafe { self.desc_ring.tkn_ref_ring[buff_id].assume_init_read() };

let (send_buff, recv_buff) = {
let BufferToken {
send_buff,
recv_buff,
..
} = unsafe { Rc::get_mut_unchecked(&mut tkn).buff_tkn.as_mut().unwrap() };
} = tkn.buff_tkn.as_mut().unwrap();
(recv_buff.as_mut(), send_buff.as_mut())
};

Expand Down Expand Up @@ -789,14 +775,14 @@ impl<'a> WriteCtrl<'a> {
}
}

fn make_avail(&mut self, raw_tkn: Rc<TransferToken>) {
fn make_avail(&mut self, raw_tkn: Box<TransferToken>) {
// We fail if one wants to make a buffer available without inserting one element!
assert!(self.start != self.position);
// We also fail if buff_id is not set!
assert!(self.buff_id != 0);

// provide reference, in order to let TransferToken now upon finish.
self.desc_ring.tkn_ref_ring[usize::from(self.buff_id)] = Some(raw_tkn);
self.desc_ring.tkn_ref_ring[usize::from(self.buff_id)] = MaybeUninit::new(raw_tkn);
// The driver performs a suitable memory barrier to ensure the device sees the updated descriptor table and available ring before the next step.
// See Virtio specfification v1.1. - 2.7.21
fence(Ordering::SeqCst);
Expand Down Expand Up @@ -1017,11 +1003,11 @@ impl PackedVq {
/// The `notif` parameter indicates if the driver wants to have a notification for this specific
/// transfer. This is only for performance optimization. As it is NOT ensured, that the device sees the
/// updated notification flags before finishing transfers!
pub fn dispatch_batch(&self, tkns: Vec<TransferToken>, notif: bool) -> Vec<Transfer> {
pub fn dispatch_batch(&self, tkns: Vec<TransferToken>, notif: bool) {
// Zero transfers are not allowed
assert!(!tkns.is_empty());

let (pin_tkn_lst, next_off, next_wrap) = self.descr_ring.borrow_mut().push_batch(tkns);
let (next_off, next_wrap) = self.descr_ring.borrow_mut().push_batch(tkns);

if notif {
self.drv_event
Expand All @@ -1048,16 +1034,6 @@ impl PackedVq {

self.notif_ctrl.notify_dev(&notif_data)
}

let mut transfer_lst = Vec::with_capacity(pin_tkn_lst.len());

for pinned in pin_tkn_lst {
transfer_lst.push(Transfer {
transfer_tkn: Some(pinned),
})
}

transfer_lst
}

/// Dispatches a batch of TransferTokens. The Transfers will be placed in to the `await_queue`
Expand Down Expand Up @@ -1086,7 +1062,7 @@ impl PackedVq {
tkn.await_queue = Some(Rc::clone(&await_queue));
}

let (_, next_off, next_wrap) = self.descr_ring.borrow_mut().push_batch(tkns);
let (next_off, next_wrap) = self.descr_ring.borrow_mut().push_batch(tkns);

if notif {
self.drv_event
Expand Down Expand Up @@ -1120,8 +1096,8 @@ impl PackedVq {
/// The `notif` parameter indicates if the driver wants to have a notification for this specific
/// transfer. This is only for performance optimization. As it is NOT ensured, that the device sees the
/// updated notification flags before finishing transfers!
pub fn dispatch(&self, tkn: TransferToken, notif: bool) -> Transfer {
let (pin_tkn, next_off, next_wrap) = self.descr_ring.borrow_mut().push(tkn);
pub fn dispatch(&self, tkn: TransferToken, notif: bool) {
let (next_off, next_wrap) = self.descr_ring.borrow_mut().push(tkn);

if notif {
self.drv_event
Expand All @@ -1148,10 +1124,6 @@ impl PackedVq {

self.notif_ctrl.notify_dev(&notif_data)
}

Transfer {
transfer_tkn: Some(pin_tkn),
}
}

/// See `Virtq.index()` documentation
Expand Down
Loading

0 comments on commit 293703f

Please sign in to comment.