Skip to content

Commit

Permalink
virtq: use async_channel for moving buffers to the callers
Browse files Browse the repository at this point in the history
  • Loading branch information
cagatay-y committed May 6, 2024
1 parent ee58b0f commit 1354cb2
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 91 deletions.
46 changes: 43 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ async-lock = { version = "3.3.0", default-features = false }
simple-shell = { version = "0.0.1", optional = true }
volatile = "0.5.4"
anstyle = { version = "1", default-features = false }
async-channel = { version = "2.2.1", default-features = false }

[dependencies.smoltcp]
version = "0.11"
Expand Down
15 changes: 2 additions & 13 deletions src/drivers/net/virtio_mmio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
//!
//! The module contains ...
use alloc::collections::VecDeque;
use alloc::rc::Rc;
use alloc::vec::Vec;
use core::cell::RefCell;
use core::ptr;
use core::ptr::read_volatile;
use core::str::FromStr;
Expand Down Expand Up @@ -138,17 +136,8 @@ impl VirtioNetDriver {
isr_stat,
notif_cfg,
ctrl_vq: CtrlQueue::new(None),
recv_vqs: RxQueues::new(
Vec::<Rc<dyn Virtq>>::new(),
Rc::new(RefCell::new(VecDeque::new())),
false,
),
send_vqs: TxQueues::new(
Vec::<Rc<dyn Virtq>>::new(),
Rc::new(RefCell::new(VecDeque::new())),
Vec::new(),
false,
),
recv_vqs: RxQueues::new(Vec::<Rc<dyn Virtq>>::new(), false),
send_vqs: TxQueues::new(Vec::<Rc<dyn Virtq>>::new(), Vec::new(), false),
num_vqs: 0,
irq,
mtu,
Expand Down
61 changes: 29 additions & 32 deletions src/drivers/net/virtio_net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
//! The module contains ...
use alloc::boxed::Box;
use alloc::collections::VecDeque;
use alloc::rc::Rc;
use alloc::vec::Vec;
use core::cell::RefCell;
use core::cmp::Ordering;
use core::mem;

Expand Down Expand Up @@ -154,19 +152,18 @@ enum MqCmd {

pub struct RxQueues {
vqs: Vec<Rc<dyn Virtq>>,
poll_queue: Rc<RefCell<VecDeque<Box<BufferToken>>>>,
poll_sender: async_channel::Sender<Box<BufferToken>>,
poll_receiver: async_channel::Receiver<Box<BufferToken>>,
is_multi: bool,
}

impl RxQueues {
pub fn new(
vqs: Vec<Rc<dyn Virtq>>,
poll_queue: Rc<RefCell<VecDeque<Box<BufferToken>>>>,
is_multi: bool,
) -> Self {
pub fn new(vqs: Vec<Rc<dyn Virtq>>, is_multi: bool) -> Self {
let (poll_sender, poll_receiver) = async_channel::unbounded();
Self {
vqs,
poll_queue,
poll_sender,
poll_receiver,
is_multi,
}
}
Expand Down Expand Up @@ -212,7 +209,7 @@ impl RxQueues {
// Transfers will be awaited at the queue
buff_tkn
.provide()
.dispatch_await(Rc::clone(&self.poll_queue), false);
.dispatch_await(self.poll_sender.clone(), false);
}

// Safe virtqueue
Expand All @@ -224,14 +221,16 @@ impl RxQueues {
}

fn get_next(&mut self) -> Option<Box<BufferToken>> {
let transfer = self.poll_queue.borrow_mut().pop_front();
let transfer = self.poll_receiver.try_recv();

transfer.or_else(|| {
// Check if any not yet provided transfers are in the queue.
self.poll();
transfer
.or_else(|_| {
// Check if any not yet provided transfers are in the queue.
self.poll();

self.poll_queue.borrow_mut().pop_front()
})
self.poll_receiver.try_recv()
})
.ok()
}

fn poll(&self) {
Expand Down Expand Up @@ -269,23 +268,21 @@ impl RxQueues {
/// to the respective queue structures.
pub struct TxQueues {
vqs: Vec<Rc<dyn Virtq>>,
poll_queue: Rc<RefCell<VecDeque<Box<BufferToken>>>>,
poll_sender: async_channel::Sender<Box<BufferToken>>,
poll_receiver: async_channel::Receiver<Box<BufferToken>>,
ready_queue: Vec<BufferToken>,
/// Indicates, whether the Driver/Device are using multiple
/// queues for communication.
is_multi: bool,
}

impl TxQueues {
pub fn new(
vqs: Vec<Rc<dyn Virtq>>,
poll_queue: Rc<RefCell<VecDeque<Box<BufferToken>>>>,
ready_queue: Vec<BufferToken>,
is_multi: bool,
) -> Self {
pub fn new(vqs: Vec<Rc<dyn Virtq>>, ready_queue: Vec<BufferToken>, is_multi: bool) -> Self {
let (poll_sender, poll_receiver) = async_channel::unbounded();
Self {
vqs,
poll_queue,
poll_sender,
poll_receiver,
ready_queue,
is_multi,
}
Expand Down Expand Up @@ -408,11 +405,11 @@ impl TxQueues {
}
}

if self.poll_queue.borrow().is_empty() {
if self.poll_receiver.is_empty() {
self.poll();
}

while let Some(buffer_token) = self.poll_queue.borrow_mut().pop_back() {
while let Ok(buffer_token) = self.poll_receiver.try_recv() {
let mut tkn = buffer_token.reset();
let (send_len, _) = tkn.len();

Expand Down Expand Up @@ -484,7 +481,7 @@ impl NetworkDriver for VirtioNetDriver {
#[allow(dead_code)]
fn has_packet(&self) -> bool {
self.recv_vqs.poll();
!self.recv_vqs.poll_queue.borrow().is_empty()
!self.recv_vqs.poll_receiver.is_empty()
}

/// Provides smoltcp a slice to copy the IP packet and transfer the packet
Expand Down Expand Up @@ -547,7 +544,7 @@ impl NetworkDriver for VirtioNetDriver {

buff_tkn
.provide()
.dispatch_await(Rc::clone(&self.send_vqs.poll_queue), false);
.dispatch_await(self.send_vqs.poll_sender.clone(), false);

result
} else {
Expand Down Expand Up @@ -581,7 +578,7 @@ impl NetworkDriver for VirtioNetDriver {
transfer
.reset()
.provide()
.dispatch_await(Rc::clone(&self.recv_vqs.poll_queue), false);
.dispatch_await(self.recv_vqs.poll_sender.clone(), false);

return None;
}
Expand All @@ -598,7 +595,7 @@ impl NetworkDriver for VirtioNetDriver {
transfer
.reset()
.provide()
.dispatch_await(Rc::clone(&self.recv_vqs.poll_queue), false);
.dispatch_await(self.recv_vqs.poll_sender.clone(), false);

num_buffers
};
Expand All @@ -620,7 +617,7 @@ impl NetworkDriver for VirtioNetDriver {
transfer
.reset()
.provide()
.dispatch_await(Rc::clone(&self.recv_vqs.poll_queue), false);
.dispatch_await(self.recv_vqs.poll_sender.clone(), false);
}

Some((RxToken::new(vec_data), TxToken::new()))
Expand All @@ -631,7 +628,7 @@ impl NetworkDriver for VirtioNetDriver {
.write_seq(None::<&VirtioNetHdr>, Some(&VirtioNetHdr::default()))
.unwrap()
.provide()
.dispatch_await(Rc::clone(&self.recv_vqs.poll_queue), false);
.dispatch_await(self.recv_vqs.poll_sender.clone(), false);

None
}
Expand Down
12 changes: 2 additions & 10 deletions src/drivers/net/virtio_pci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
//!
//! The module contains ...
use alloc::collections::VecDeque;
use alloc::rc::Rc;
use alloc::vec::Vec;
use core::cell::RefCell;
use core::str::FromStr;

use smoltcp::phy::ChecksumCapabilities;
Expand Down Expand Up @@ -146,13 +143,8 @@ impl VirtioNetDriver {
notif_cfg,

ctrl_vq: CtrlQueue::new(None),
recv_vqs: RxQueues::new(Vec::new(), Rc::new(RefCell::new(VecDeque::new())), false),
send_vqs: TxQueues::new(
Vec::new(),
Rc::new(RefCell::new(VecDeque::new())),
Vec::new(),
false,
),
recv_vqs: RxQueues::new(Vec::new(), false),
send_vqs: TxQueues::new(Vec::new(), Vec::new(), false),
num_vqs: 0,
irq: device.get_irq().unwrap(),
mtu,
Expand Down
Loading

0 comments on commit 1354cb2

Please sign in to comment.