Skip to content

Commit

Permalink
modify futex
Browse files Browse the repository at this point in the history
  • Loading branch information
Baiqs11 committed Jun 25, 2024
1 parent 0dfe1b3 commit 7186ab2
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 71 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/c/futex/futex.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ int main() {
int *futex_ptr = &futex_var;

// 唤醒等待在 futex 上的线程(私有唤醒)
syscall(SYS_futex, futex_ptr, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
syscall(SYS_futex, futex_ptr, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, 0);

return 0;
}
1 change: 1 addition & 0 deletions ulib/axstarry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ axlibc = { path = "../../ulib/axlibc", features = ["fp_simd", "alloc", "fs", "fd
axconfig = { path = "../../modules/axconfig" }
axsync = { path = "../../modules/axsync", optional = true }
axmem = { path = "../../modules/axmem" }
axstd = { path = "../../ulib/axstd" }

crate_interface = { path = "../../crates/crate_interface" }
eventfd = { path = "../../crates/eventfd" }
Expand Down
9 changes: 9 additions & 0 deletions ulib/axstarry/src/ctypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ pub enum FutexFlags {
Wake,
/// 将等待 uaddr 的线程移动到 uaddr2
Requeue,
/// 同Wait, 多了一个bitset参数,表示等待的位
WaitBitset,
/// 同Wake, 多了一个bitset参数,表示等待的位
WakeBitset,
/// 设置墙上时间
RealTime,
/// 不支持的操作
Unsupported,
}
Expand All @@ -299,6 +305,9 @@ impl FutexFlags {
0 => FutexFlags::Wait,
1 => FutexFlags::Wake,
3 => FutexFlags::Requeue,
9 => FutexFlags::WaitBitset,
10 => FutexFlags::WakeBitset,
256 => FutexFlags::RealTime,
_ => FutexFlags::Unsupported,
}
}
Expand Down
269 changes: 199 additions & 70 deletions ulib/axstarry/src/syscall_task/imp/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axprocess::{
};
use axtask::CurrentTask;

pub use axstd::time::Instant;
use core::time::Duration;

use crate::{FutexFlags, RobustList, SyscallError, SyscallResult, TimeSecs};
Expand Down Expand Up @@ -69,6 +70,196 @@ fn futex_unqueue(key: FutexKey, curr: &CurrentTask) -> bool {
false
}

fn futex_wait(
vaddr: VirtAddr,
futex_op: i32,
val: u32,
timeout: usize,
) -> Result<usize, SyscallError> {
let mut to = false;
let current_task = current_task();
let deadline = if timeout != 0 {
Some(Duration::from_nanos(timeout as u64) + axhal::time::current_time())
} else {
None
};
loop {
let key = get_futex_key(vaddr, futex_op);
let process = current_process();
if process.manual_alloc_for_lazy(vaddr).is_ok() {
let real_futex_val = unsafe { (vaddr.as_usize() as *const u32).read_volatile() };
info!("real val: {:#x}, expected val: {:#x}", real_futex_val, val);
if real_futex_val != val {
return Err(SyscallError::EAGAIN);
}
futex_quque(key, &current_task, val);

if let Some(deadline) = deadline {
let now = axhal::time::current_time();
to = deadline < now;
}
if timeout == 0 || !to {
yield_now_task();
}
// If we were woken (and unqueued), we succeeded, whatever.
// TODO: plist_del, not just iterate all the list
if !futex_unqueue(key, &current_task) {
return Ok(0);
}
if to {
return Err(SyscallError::ETIMEDOUT);
}
// we expect signal_pending(current), but we might be the victim
// of a spurious wakeup as well.
#[cfg(feature = "signal")]
if process.have_signals().is_some() {
// 被信号打断
return Err(SyscallError::EINTR);
}
} else {
return Err(SyscallError::EFAULT);
}
}
}

enum MyResult {
InstantResult(Instant),
DurationResult(Duration),
}

fn get_deadline(futex_op: i32, timeout: usize) -> Option<MyResult> {
const FUTEX_WAIT_BITSET: i32 = 9;
const FUTEX_CLOCK_REALTIME: i32 = 25;
match futex_op {
FUTEX_CLOCK_REALTIME => {
let now = Instant::now();
let timeout_duration = Duration::from_nanos(timeout as u64);
let instant_deadline = now.checked_add(timeout_duration)?;
Some(MyResult::InstantResult(instant_deadline))
}
FUTEX_WAIT_BITSET => {
let duration_deadline =
Duration::from_nanos(timeout as u64) + axhal::time::current_time();
Some(MyResult::DurationResult(duration_deadline))
}
_ => None,
}
}

fn is_timed_out(deadline: &Option<MyResult>) -> bool {
if let Some(ref deadline) = deadline {
match deadline {
MyResult::InstantResult(instant) => {
return Instant::now().duration_since(*instant) > Duration::from_secs(0);
}
MyResult::DurationResult(duration) => {
return axhal::time::current_time() > *duration;
}
}
}
false
}

fn futex_wait_bitset(
vaddr: VirtAddr,
futex_op: i32,
val: u32,
timeout: usize,
bitset: u32,
) -> Result<usize, SyscallError> {
let current_task = current_task();
let deadline = get_deadline(futex_op, timeout);
loop {
let key = get_futex_key(vaddr, futex_op);
let process = current_process();
if process.manual_alloc_for_lazy(vaddr).is_ok() {
let real_futex_val = unsafe { (vaddr.as_usize() as *const u32).read_volatile() };
info!("real val: {:#x}, expected val: {:#x}", real_futex_val, val);
if real_futex_val != val {
return Err(SyscallError::EAGAIN);
}
futex_quque(key, &current_task, val);

if (real_futex_val & bitset) == bitset {
return Ok(0);
}
if timeout == 0 || !is_timed_out(&deadline) {
yield_now_task();
}
// If we were woken (and unqueued), we succeeded, whatever.
// TODO: plist_del, not just iterate all the list
if !futex_unqueue(key, &current_task) {
return Ok(0);
}
if is_timed_out(&deadline) {
return Err(SyscallError::ETIMEDOUT);
}
// we expect signal_pending(current), but we might be the victim
// of a spurious wakeup as well.
#[cfg(feature = "signal")]
if process.have_signals().is_some() {
// 被信号打断
return Err(SyscallError::EINTR);
}
} else {
return Err(SyscallError::EFAULT);
}
}
}

fn futex_wake(vaddr: VirtAddr, futex_op: i32, val: u32) -> Result<usize, SyscallError> {
let mut ret = 0;
let key = get_futex_key(vaddr, futex_op);
// 当前任务释放了锁,所以不需要再次释放
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if futex_wait_task.contains_key(&key) {
let wait_list = futex_wait_task.get_mut(&key).unwrap();
// info!("now task: {}", wait_list.len());
while let Some((task, _)) = wait_list.pop_front() {
// 唤醒一个正在等待的任务
info!("wake task: {}", task.id().as_u64());
// WAIT_FOR_FUTEX.notify_task(false, &task);
ret += 1;
if ret == val {
break;
}
}
}
drop(futex_wait_task);
yield_now_task();
Ok(ret as usize)
}

fn futex_wake_bitset(
vaddr: VirtAddr,
futex_op: i32,
val: u32,
bitset: u32,
) -> Result<usize, SyscallError> {
let mut ret = 0;
let key = get_futex_key(vaddr, futex_op);
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if let Some(wait_list) = futex_wait_task.get_mut(&key) {
for (task, task_bitset) in wait_list.iter() {
let wakeup = (val == 0 || ret < val) && ((*task_bitset & bitset) != 0);
if wakeup {
WAIT_FOR_FUTEX.notify_task(false, &task);
ret += 1;
}
if val != 0 && ret >= val {
break;
}
}

wait_list.retain(|(_, task_bitset)| (*task_bitset & !bitset) != 0);
if wait_list.is_empty() {
futex_wait_task.remove(&key);
}
}
drop(futex_wait_task);
Ok(ret as usize)
}

/// To do the futex operation
///
/// It may create, remove the futex wait task or requeue the futex wait task
Expand All @@ -79,83 +270,21 @@ pub fn futex(
timeout: usize,
vaddr2: VirtAddr,
val2: usize,
_val3: u32,
val3: u32,
) -> Result<usize, SyscallError> {
let flag = FutexFlags::new(futex_op);
let current_task = current_task();
match flag {
FutexFlags::Wait => {
let mut to = false;
let deadline = if timeout != 0 {
Some(Duration::from_nanos(timeout as u64) + axhal::time::current_time())
} else {
None
};
loop {
let key = get_futex_key(vaddr, futex_op);
let process = current_process();
if process.manual_alloc_for_lazy(vaddr).is_ok() {
let real_futex_val =
unsafe { (vaddr.as_usize() as *const u32).read_volatile() };
info!("real val: {:#x}, expected val: {:#x}", real_futex_val, val);
if real_futex_val != val {
return Err(SyscallError::EAGAIN);
}
futex_quque(key, &current_task, val);

if let Some(deadline) = deadline {
let now = axhal::time::current_time();
to = deadline < now;
}
if timeout == 0 || !to {
yield_now_task();
}
// If we were woken (and unqueued), we succeeded, whatever.
// TODO: plist_del, not just iterate all the list
if !futex_unqueue(key, &current_task) {
return Ok(0);
}
if to {
return Err(SyscallError::ETIMEDOUT);
}
// we expect signal_pending(current), but we might be the victim
// of a spurious wakeup as well.
#[cfg(feature = "signal")]
if process.have_signals().is_some() {
// 被信号打断
return Err(SyscallError::EINTR);
}
} else {
return Err(SyscallError::EFAULT);
}
}
}
FutexFlags::Wake => {
let mut ret = 0;
let key = get_futex_key(vaddr, futex_op);
// // 当前任务释放了锁,所以不需要再次释放
let mut futex_wait_task = FUTEX_WAIT_TASK.lock();
if futex_wait_task.contains_key(&key) {
let wait_list = futex_wait_task.get_mut(&key).unwrap();
// info!("now task: {}", wait_list.len());
while let Some((task, _)) = wait_list.pop_front() {
// 唤醒一个正在等待的任务
info!("wake task: {}", task.id().as_u64());
// WAIT_FOR_FUTEX.notify_task(false, &task);
ret += 1;
if ret == val {
break;
}
}
}
drop(futex_wait_task);
yield_now_task();
Ok(ret as usize)
}
match flag {
FutexFlags::Wait => futex_wait(vaddr, futex_op, val, timeout),
FutexFlags::Wake => futex_wake(vaddr, futex_op, val),
FutexFlags::Requeue => {
futex_requeue(val, val2, vaddr, vaddr2);
Ok(0)
}
FutexFlags::WaitBitset | FutexFlags::RealTime => {
futex_wait_bitset(vaddr, futex_op, val, timeout, val3)
}
FutexFlags::WakeBitset => futex_wake_bitset(vaddr, futex_op, val, val3),
_ => {
Err(SyscallError::EINVAL)
// return Ok(0);
Expand Down
8 changes: 8 additions & 0 deletions ulib/axstd/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ impl Instant {
Instant(arceos_api::time::ax_current_time())
}

/// Creates an `Instant` from the specified number of seconds and nanoseconds.
pub fn from_nanos(nanos: u64) -> Instant {
const NANOS_PER_SEC: u32 = 1_000_000_000;
let secs = nanos / NANOS_PER_SEC as u64;
let nanos = (nanos % NANOS_PER_SEC as u64) as u32;
Instant(AxTimeValue::new(secs, nanos))
}

/// Returns the amount of time elapsed from another instant to this one,
/// or zero duration if that instant is later than this one.
///
Expand Down

0 comments on commit 7186ab2

Please sign in to comment.