Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: futex requeue error and implement futex key #49

Merged
merged 1 commit into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion modules/axprocess/src/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,42 @@ use axhal::mem::VirtAddr;
use axsync::Mutex;
use axtask::{AxTaskRef, WaitQueue};

use crate::current_process;

extern crate alloc;

/// vec中的元素分别是任务指针,对应存储时的futex变量的值
pub static FUTEX_WAIT_TASK: Mutex<BTreeMap<VirtAddr, VecDeque<(AxTaskRef, u32)>>> =
pub static FUTEX_WAIT_TASK: Mutex<BTreeMap<FutexKey, VecDeque<(AxTaskRef, u32)>>> =
Mutex::new(BTreeMap::new());

/// waiting queue which stores tasks waiting for futex variable
pub static WAIT_FOR_FUTEX: WaitQueue = WaitQueue::new();

/// Futexes are matched on equal values of this key.
///
/// The key type depends on whether it's a shared or private mapping.
/// use pid to replace the mm_struct pointer
#[derive(Copy, Clone, Default, Ord, PartialOrd, Eq, PartialEq)]
pub struct FutexKey {
ptr: u64,
word: usize,
offset: u32,
}

impl FutexKey {
fn new(ptr: u64, word: usize, offset: u32) -> Self {
Self { ptr, word, offset }
}
}
/// 获取futex变量的key
/// TODO: shared futex and error handling
pub fn get_futex_key(uaddr: VirtAddr, _flags: i32) -> FutexKey {
let ptr = current_process().pid();
let offset = uaddr.align_offset_4k() as u32;
let word = uaddr.align_down_4k().as_usize();
FutexKey::new(ptr, word, offset)
}

#[derive(Default)]
/// 用于存储 robust list 的结构
pub struct FutexRobustList {
Expand Down
2 changes: 0 additions & 2 deletions ulib/axstarry/src/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use axlog::info;

#[no_mangle]
pub fn syscall(syscall_id: usize, args: [usize; 6]) -> isize {
#[cfg(feature = "futex")]
crate::syscall_task::check_dead_wait();
#[allow(unused_mut, unused_assignments)]
let mut ans: Option<SyscallResult> = None;

Expand Down
6 changes: 0 additions & 6 deletions ulib/axstarry/src/syscall_fs/ctype/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ pub struct EpollEvent {
pub event_type: EpollEventType,
/// 事件中使用到的数据,如fd等
pub data: u64,
pub fd: i32,
pub data_u32: u32,
pub data_u64: u64,
}

numeric_enum_macro::numeric_enum! {
Expand Down Expand Up @@ -195,9 +192,6 @@ impl EpollFile {
ret_events.push(EpollEvent {
event_type: EpollEventType::EPOLLERR,
data: req_event.data,
fd: -1,
data_u32: 0,
data_u64: 0,
});
}
}
Expand Down
3 changes: 0 additions & 3 deletions ulib/axstarry/src/syscall_fs/imp/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,6 @@ pub fn syscall_close(args: [usize; 6]) -> SyscallResult {
let ev = EpollEvent {
event_type: EpollEventType::EPOLLMSG,
data: 0,
fd: -1,
data_u32: 0,
data_u64: 0,
};
epoll_file.epoll_ctl(EpollCtl::DEL, fd as i32, ev)?;
}
Expand Down
160 changes: 85 additions & 75 deletions ulib/axstarry/src/syscall_task/imp/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use axhal::mem::VirtAddr;
use axlog::info;
use axprocess::{
current_process, current_task,
futex::{FutexRobustList, FUTEX_WAIT_TASK, WAIT_FOR_FUTEX},
futex::{get_futex_key, FutexKey, FutexRobustList, FUTEX_WAIT_TASK, WAIT_FOR_FUTEX},
yield_now_task,
};
use axtask::TaskState;
use axtask::CurrentTask;

use core::time::Duration;

use crate::{FutexFlags, RobustList, SyscallError, SyscallResult, TimeSecs};

Expand All @@ -23,28 +25,50 @@ use crate::{FutexFlags, RobustList, SyscallError, SyscallResult, TimeSecs};
///
/// 不考虑检查操作
pub fn futex_requeue(wake_num: u32, move_num: usize, src_addr: VirtAddr, dst_addr: VirtAddr) {
let key = get_futex_key(src_addr, 0);
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if !futex_wait_task.contains_key(&src_addr) {
if !futex_wait_task.contains_key(&key) {
return;
}
let src_wait_task = futex_wait_task.get_mut(&src_addr).unwrap();
let src_wait_task = futex_wait_task.get_mut(&key).unwrap();
for _ in 0..wake_num {
if let Some((task, _)) = src_wait_task.pop_front() {
WAIT_FOR_FUTEX.notify_task(false, &task);
} else {
break;
}
}

if !src_wait_task.is_empty() {
let key_new = get_futex_key(dst_addr, 0);
let move_num = move_num.min(src_wait_task.len());

let mut temp_move_task = src_wait_task.drain(..move_num).collect::<VecDeque<_>>();
let dst_wait_task = futex_wait_task.entry(dst_addr).or_default();
let dst_wait_task = futex_wait_task.entry(key_new).or_default();
dst_wait_task.append(&mut temp_move_task);
}
}

fn futex_quque(key: FutexKey, curr: &CurrentTask, val: u32) {
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
let wait_list = futex_wait_task.entry(key).or_default();
wait_list.push_back((curr.as_task_ref().clone(), val));
}

fn futex_unqueue(key: FutexKey, curr: &CurrentTask) -> bool {
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if futex_wait_task.contains_key(&key) {
let wait_list = futex_wait_task.get_mut(&key).unwrap();
if let Some(index) = wait_list
.iter()
.position(|(task, _)| task.id() == curr.id())
{
wait_list.remove(index);
return true;
}
}
false
}

/// To do the futex operation
///
/// It may create, remove the futex wait task or requeue the futex wait task
Expand All @@ -54,76 +78,82 @@ pub fn futex(
val: u32,
timeout: usize,
vaddr2: VirtAddr,
val2: usize,
_val3: u32,
) -> Result<usize, SyscallError> {
let flag = FutexFlags::new(futex_op);
let current_task = current_task();
match flag {
FutexFlags::Wait => {
let process = current_process();
if process.manual_alloc_for_lazy(vaddr).is_ok() {
let real_futex_val = unsafe { (vaddr.as_usize() as *const u32).read_volatile() };
info!("real val: {:#x}, expected val: {:#x}", real_futex_val, val);
if real_futex_val != val {
return Err(SyscallError::EAGAIN);
}
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
let wait_list = futex_wait_task.entry(vaddr).or_default();
wait_list.push_back((current_task.as_task_ref().clone(), val));
// // 输出每一个键值对应的vec的长度
drop(futex_wait_task);
// info!("timeout: {}", timeout as u64);
// debug!("ready wait!");
if timeout == 0 {
yield_now_task();
}
#[cfg(feature = "signal")]
{
use core::time::Duration;
if timeout != 0
&& !WAIT_FOR_FUTEX.wait_timeout(Duration::from_nanos(timeout as u64))
&& process.have_signals().is_some()
{
let mut to = false;
let deadline = if timeout != 0 {
Some(Duration::from_nanos(timeout as u64) + axhal::time::current_time())
} else {
None
};
loop {
let key = get_futex_key(vaddr, futex_op);
let process = current_process();
if process.manual_alloc_for_lazy(vaddr).is_ok() {
let real_futex_val =
unsafe { (vaddr.as_usize() as *const u32).read_volatile() };
info!("real val: {:#x}, expected val: {:#x}", real_futex_val, val);
if real_futex_val != val {
return Err(SyscallError::EAGAIN);
}
futex_quque(key, &current_task, val);

if let Some(deadline) = deadline {
let now = axhal::time::current_time();
to = deadline < now;
}
if timeout == 0 || !to {
yield_now_task();
}
// If we were woken (and unqueued), we succeeded, whatever.
// TODO: plist_del, not just iterate all the list
if !futex_unqueue(key, &current_task) {
return Ok(0);
}
if to {
return Err(SyscallError::ETIMEDOUT);
}
// we expect signal_pending(current), but we might be the victim
// of a spurious wakeup as well.
#[cfg(feature = "signal")]
if process.have_signals().is_some() {
// 被信号打断
return Err(SyscallError::EINTR);
}
} else {
return Err(SyscallError::EFAULT);
}

Ok(0)
} else {
Err(SyscallError::EFAULT)
}
}
FutexFlags::Wake => {
let mut ret = 0;
let key = get_futex_key(vaddr, futex_op);
// // 当前任务释放了锁,所以不需要再次释放
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if futex_wait_task.contains_key(&vaddr) {
let wait_list = futex_wait_task.get_mut(&vaddr).unwrap();
if futex_wait_task.contains_key(&key) {
let wait_list = futex_wait_task.get_mut(&key).unwrap();
// info!("now task: {}", wait_list.len());
loop {
if let Some((task, _)) = wait_list.pop_front() {
// 唤醒一个正在等待的任务
if task.state() != TaskState::Blocked {
// 说明自己已经醒了,那么就不在wait里面了
continue;
}
info!("wake task: {}", task.id().as_u64());
drop(futex_wait_task);
WAIT_FOR_FUTEX.notify_task(false, &task);
} else {
drop(futex_wait_task);
while let Some((task, _)) = wait_list.pop_front() {
// 唤醒一个正在等待的任务
info!("wake task: {}", task.id().as_u64());
// WAIT_FOR_FUTEX.notify_task(false, &task);
ret += 1;
if ret == val {
break;
}
break;
}
} else {
drop(futex_wait_task);
}
drop(futex_wait_task);
yield_now_task();
Ok(val as usize)
Ok(ret as usize)
}
FutexFlags::Requeue => {
// 此时timeout相当于val2,即是move_num
futex_requeue(val, timeout, vaddr, vaddr2);
futex_requeue(val, val2, vaddr, vaddr2);
Ok(0)
}
_ => {
Expand All @@ -133,27 +163,6 @@ pub fn futex(
}
}

/// To check the futex wait task
///
/// If the futex value has been changed, then wake up the task
pub fn check_dead_wait() {
let process = current_process();
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
for (vaddr, wait_list) in futex_wait_task.iter_mut() {
if process.manual_alloc_for_lazy(*vaddr).is_ok() {
let real_futex_val = unsafe { ((*vaddr).as_usize() as *const u32).read_volatile() };
for (task, val) in wait_list.iter() {
if real_futex_val != *val && task.state() == TaskState::Blocked {
WAIT_FOR_FUTEX.notify_task(false, task);
}
}
// 仅保留那些真正等待的任务
wait_list
.retain(|(task, val)| real_futex_val == *val && task.state() == TaskState::Blocked);
}
}
}

/// # Arguments
/// * vaddr: usize
/// * futex_op: i32
Expand Down Expand Up @@ -184,6 +193,7 @@ pub fn syscall_futex(args: [usize; 6]) -> SyscallResult {
futex_val,
timeout,
vaddr2.into(),
time_out_val,
val3,
) {
Ok(ans) => Ok(ans as isize),
Expand Down
Loading