Skip to content

Commit

Permalink
fix: futex requeue error and implement futex key
Browse files Browse the repository at this point in the history
  • Loading branch information
YXalix committed May 10, 2024
1 parent d522a74 commit a3c53ce
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 87 deletions.
29 changes: 28 additions & 1 deletion modules/axprocess/src/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,42 @@ use axhal::mem::VirtAddr;
use axsync::Mutex;
use axtask::{AxTaskRef, WaitQueue};

use crate::current_process;

extern crate alloc;

/// vec中的元素分别是任务指针,对应存储时的futex变量的值
pub static FUTEX_WAIT_TASK: Mutex<BTreeMap<VirtAddr, VecDeque<(AxTaskRef, u32)>>> =
pub static FUTEX_WAIT_TASK: Mutex<BTreeMap<FutexKey, VecDeque<(AxTaskRef, u32)>>> =
Mutex::new(BTreeMap::new());

/// waiting queue which stores tasks waiting for futex variable
pub static WAIT_FOR_FUTEX: WaitQueue = WaitQueue::new();

/// Futexes are matched on equal values of this key.
///
/// The key type depends on whether it's a shared or private mapping.
/// use pid to replace the mm_struct pointer
#[derive(Copy, Clone, Default, Ord, PartialOrd, Eq, PartialEq)]
pub struct FutexKey {
ptr: u64,
word: usize,
offset: u32,
}

impl FutexKey {
fn new(ptr: u64, word: usize, offset: u32) -> Self {
Self { ptr, word, offset }
}
}
/// 获取futex变量的key
/// TODO: shared futex and error handling
pub fn get_futex_key(uaddr: VirtAddr, _flags: i32) -> FutexKey {
let ptr = current_process().pid();
let offset = uaddr.align_offset_4k() as u32;
let word = uaddr.align_down_4k().as_usize();
FutexKey::new(ptr, word, offset)
}

#[derive(Default)]
/// 用于存储 robust list 的结构
pub struct FutexRobustList {
Expand Down
2 changes: 0 additions & 2 deletions ulib/axstarry/src/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use axlog::info;

#[no_mangle]
pub fn syscall(syscall_id: usize, args: [usize; 6]) -> isize {
#[cfg(feature = "futex")]
crate::syscall_task::check_dead_wait();
#[allow(unused_mut, unused_assignments)]
let mut ans: Option<SyscallResult> = None;

Expand Down
6 changes: 0 additions & 6 deletions ulib/axstarry/src/syscall_fs/ctype/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ pub struct EpollEvent {
pub event_type: EpollEventType,
/// 事件中使用到的数据,如fd等
pub data: u64,
pub fd: i32,
pub data_u32: u32,
pub data_u64: u64,
}

numeric_enum_macro::numeric_enum! {
Expand Down Expand Up @@ -195,9 +192,6 @@ impl EpollFile {
ret_events.push(EpollEvent {
event_type: EpollEventType::EPOLLERR,
data: req_event.data,
fd: -1,
data_u32: 0,
data_u64: 0,
});
}
}
Expand Down
3 changes: 0 additions & 3 deletions ulib/axstarry/src/syscall_fs/imp/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,6 @@ pub fn syscall_close(args: [usize; 6]) -> SyscallResult {
let ev = EpollEvent {
event_type: EpollEventType::EPOLLMSG,
data: 0,
fd: -1,
data_u32: 0,
data_u64: 0,
};
epoll_file.epoll_ctl(EpollCtl::DEL, fd as i32, ev)?;
}
Expand Down
160 changes: 85 additions & 75 deletions ulib/axstarry/src/syscall_task/imp/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use axhal::mem::VirtAddr;
use axlog::info;
use axprocess::{
current_process, current_task,
futex::{FutexRobustList, FUTEX_WAIT_TASK, WAIT_FOR_FUTEX},
futex::{get_futex_key, FutexKey, FutexRobustList, FUTEX_WAIT_TASK, WAIT_FOR_FUTEX},
yield_now_task,
};
use axtask::TaskState;
use axtask::CurrentTask;

use core::time::Duration;

use crate::{FutexFlags, RobustList, SyscallError, SyscallResult, TimeSecs};

Expand All @@ -23,28 +25,50 @@ use crate::{FutexFlags, RobustList, SyscallError, SyscallResult, TimeSecs};
///
/// 不考虑检查操作
pub fn futex_requeue(wake_num: u32, move_num: usize, src_addr: VirtAddr, dst_addr: VirtAddr) {
let key = get_futex_key(src_addr, 0);
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if !futex_wait_task.contains_key(&src_addr) {
if !futex_wait_task.contains_key(&key) {
return;
}
let src_wait_task = futex_wait_task.get_mut(&src_addr).unwrap();
let src_wait_task = futex_wait_task.get_mut(&key).unwrap();
for _ in 0..wake_num {
if let Some((task, _)) = src_wait_task.pop_front() {
WAIT_FOR_FUTEX.notify_task(false, &task);
} else {
break;
}
}

if !src_wait_task.is_empty() {
let key_new = get_futex_key(dst_addr, 0);
let move_num = move_num.min(src_wait_task.len());

let mut temp_move_task = src_wait_task.drain(..move_num).collect::<VecDeque<_>>();
let dst_wait_task = futex_wait_task.entry(dst_addr).or_default();
let dst_wait_task = futex_wait_task.entry(key_new).or_default();
dst_wait_task.append(&mut temp_move_task);
}
}

fn futex_quque(key: FutexKey, curr: &CurrentTask, val: u32) {
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
let wait_list = futex_wait_task.entry(key).or_default();
wait_list.push_back((curr.as_task_ref().clone(), val));
}

fn futex_unqueue(key: FutexKey, curr: &CurrentTask) -> bool {
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if futex_wait_task.contains_key(&key) {
let wait_list = futex_wait_task.get_mut(&key).unwrap();
if let Some(index) = wait_list
.iter()
.position(|(task, _)| task.id() == curr.id())
{
wait_list.remove(index);
return true;
}
}
false
}

/// To do the futex operation
///
/// It may create, remove the futex wait task or requeue the futex wait task
Expand All @@ -54,76 +78,82 @@ pub fn futex(
val: u32,
timeout: usize,
vaddr2: VirtAddr,
val2: usize,
_val3: u32,
) -> Result<usize, SyscallError> {
let flag = FutexFlags::new(futex_op);
let current_task = current_task();
match flag {
FutexFlags::Wait => {
let process = current_process();
if process.manual_alloc_for_lazy(vaddr).is_ok() {
let real_futex_val = unsafe { (vaddr.as_usize() as *const u32).read_volatile() };
info!("real val: {:#x}, expected val: {:#x}", real_futex_val, val);
if real_futex_val != val {
return Err(SyscallError::EAGAIN);
}
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
let wait_list = futex_wait_task.entry(vaddr).or_default();
wait_list.push_back((current_task.as_task_ref().clone(), val));
// // 输出每一个键值对应的vec的长度
drop(futex_wait_task);
// info!("timeout: {}", timeout as u64);
// debug!("ready wait!");
if timeout == 0 {
yield_now_task();
}
#[cfg(feature = "signal")]
{
use core::time::Duration;
if timeout != 0
&& !WAIT_FOR_FUTEX.wait_timeout(Duration::from_nanos(timeout as u64))
&& process.have_signals().is_some()
{
let mut to = false;
let deadline = if timeout != 0 {
Some(Duration::from_nanos(timeout as u64) + axhal::time::current_time())
} else {
None
};
loop {
let key = get_futex_key(vaddr, futex_op);
let process = current_process();
if process.manual_alloc_for_lazy(vaddr).is_ok() {
let real_futex_val =
unsafe { (vaddr.as_usize() as *const u32).read_volatile() };
info!("real val: {:#x}, expected val: {:#x}", real_futex_val, val);
if real_futex_val != val {
return Err(SyscallError::EAGAIN);
}
futex_quque(key, &current_task, val);

if let Some(deadline) = deadline {
let now = axhal::time::current_time();
to = deadline < now;
}
if timeout == 0 || !to {
yield_now_task();
}
// If we were woken (and unqueued), we succeeded, whatever.
// TODO: plist_del, not just iterate all the list
if !futex_unqueue(key, &current_task) {
return Ok(0);
}
if to {
return Err(SyscallError::ETIMEDOUT);
}
// we expect signal_pending(current), but we might be the victim
// of a spurious wakeup as well.
#[cfg(feature = "signal")]
if process.have_signals().is_some() {
// 被信号打断
return Err(SyscallError::EINTR);
}
} else {
return Err(SyscallError::EFAULT);
}

Ok(0)
} else {
Err(SyscallError::EFAULT)
}
}
FutexFlags::Wake => {
let mut ret = 0;
let key = get_futex_key(vaddr, futex_op);
// // 当前任务释放了锁,所以不需要再次释放
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if futex_wait_task.contains_key(&vaddr) {
let wait_list = futex_wait_task.get_mut(&vaddr).unwrap();
if futex_wait_task.contains_key(&key) {
let wait_list = futex_wait_task.get_mut(&key).unwrap();
// info!("now task: {}", wait_list.len());
loop {
if let Some((task, _)) = wait_list.pop_front() {
// 唤醒一个正在等待的任务
if task.state() != TaskState::Blocked {
// 说明自己已经醒了,那么就不在wait里面了
continue;
}
info!("wake task: {}", task.id().as_u64());
drop(futex_wait_task);
WAIT_FOR_FUTEX.notify_task(false, &task);
} else {
drop(futex_wait_task);
while let Some((task, _)) = wait_list.pop_front() {
// 唤醒一个正在等待的任务
info!("wake task: {}", task.id().as_u64());
// WAIT_FOR_FUTEX.notify_task(false, &task);
ret += 1;
if ret == val {
break;
}
break;
}
} else {
drop(futex_wait_task);
}
drop(futex_wait_task);
yield_now_task();
Ok(val as usize)
Ok(ret as usize)
}
FutexFlags::Requeue => {
// 此时timeout相当于val2,即是move_num
futex_requeue(val, timeout, vaddr, vaddr2);
futex_requeue(val, val2, vaddr, vaddr2);
Ok(0)
}
_ => {
Expand All @@ -133,27 +163,6 @@ pub fn futex(
}
}

/// To check the futex wait task
///
/// If the futex value has been changed, then wake up the task
pub fn check_dead_wait() {
let process = current_process();
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
for (vaddr, wait_list) in futex_wait_task.iter_mut() {
if process.manual_alloc_for_lazy(*vaddr).is_ok() {
let real_futex_val = unsafe { ((*vaddr).as_usize() as *const u32).read_volatile() };
for (task, val) in wait_list.iter() {
if real_futex_val != *val && task.state() == TaskState::Blocked {
WAIT_FOR_FUTEX.notify_task(false, task);
}
}
// 仅保留那些真正等待的任务
wait_list
.retain(|(task, val)| real_futex_val == *val && task.state() == TaskState::Blocked);
}
}
}

/// # Arguments
/// * vaddr: usize
/// * futex_op: i32
Expand Down Expand Up @@ -184,6 +193,7 @@ pub fn syscall_futex(args: [usize; 6]) -> SyscallResult {
futex_val,
timeout,
vaddr2.into(),
time_out_val,
val3,
) {
Ok(ans) => Ok(ans as isize),
Expand Down

0 comments on commit a3c53ce

Please sign in to comment.