Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[inetstack] Move TCP stack to use shared state machine #1008

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/rust/catnip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ impl CatnipLibOS {
return Err(Fail::new(libc::EINVAL, "zero-length buffer"));
}

let handle: TaskHandle = self.do_push(qd, buf)?;
let qt: QToken = handle.get_task_id().into();
Ok(qt)
self.do_push(qd, buf)
},
Err(e) => Err(e),
}
Expand Down
4 changes: 1 addition & 3 deletions src/rust/catpowder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ impl CatpowderLibOS {
if buf.len() == 0 {
return Err(Fail::new(libc::EINVAL, "zero-length buffer"));
}
let handle: TaskHandle = self.do_push(qd, buf)?;
let qt: QToken = handle.get_task_id().into();
Ok(qt)
self.do_push(qd, buf)
},
Err(e) => Err(e),
}
Expand Down
69 changes: 20 additions & 49 deletions src/rust/inetstack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,7 @@ impl<const N: usize> InetStack<N> {

// Search for target queue descriptor.
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.accept(qd)?;
let task_id: String = format!("Inetstack::TCP::accept for qd={:?}", qd);
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
Ok(handle.get_task_id().into())
},
QType::TcpSocket => self.ipv4.tcp.accept(qd),
// This queue descriptor does not concern a TCP socket.
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
Expand All @@ -281,12 +276,7 @@ impl<const N: usize> InetStack<N> {
let remote: SocketAddrV4 = unwrap_socketaddr(remote)?;

match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.connect(qd, remote)?;
let task_id: String = format!("Inetstack::TCP::connect for qd={:?}", qd);
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
Ok(handle.get_task_id().into())
},
QType::TcpSocket => self.ipv4.tcp.connect(qd, remote),
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
}
Expand Down Expand Up @@ -328,12 +318,8 @@ impl<const N: usize> InetStack<N> {
timer!("inetstack::async_close");
trace!("async_close(): qd={:?}", qd);

let (task_id, coroutine): (String, Pin<Box<Operation>>) = match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let task_id: String = format!("Inetstack::TCP::close for qd={:?}", qd);
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.async_close(qd)?;
(task_id, coroutine)
},
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => self.ipv4.tcp.async_close(qd),
QType::UdpSocket => {
self.ipv4.udp.close(qd)?;
let task_id: String = format!("Inetstack::UDP::close for qd={:?}", qd);
Expand All @@ -346,26 +332,20 @@ impl<const N: usize> InetStack<N> {
.expect("queue should exist");
(qd, OperationResult::Close)
});
(task_id, coroutine)
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("async_close() qt={:?}", qt);
Ok(qt)
},
_ => return Err(Fail::new(libc::EINVAL, "invalid queue type")),
};

let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("async_close() qt={:?}", qt);
Ok(qt)
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
}

/// Pushes a buffer to a TCP socket.
/// TODO: Rename this function to push() once we have a common representation across all libOSes.
pub fn do_push(&mut self, qd: QDesc, buf: DemiBuffer) -> Result<TaskHandle, Fail> {
pub fn do_push(&mut self, qd: QDesc, buf: DemiBuffer) -> Result<QToken, Fail> {
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.push(qd, buf)?;
let task_id: String = format!("Inetstack::TCP::push for qd={:?}", qd);
self.runtime.insert_coroutine(task_id.as_str(), coroutine)
},
QType::TcpSocket => self.ipv4.tcp.push(qd, buf),
_ => Err(Fail::new(libc::EINVAL, "invalid queue type")),
}
}
Expand All @@ -384,10 +364,7 @@ impl<const N: usize> InetStack<N> {
}

// Issue operation.
let handle: TaskHandle = self.do_push(qd, buf)?;
let qt: QToken = handle.get_task_id().into();
trace!("push2() qt={:?}", qt);
Ok(qt)
self.do_push(qd, buf)
}

/// Pushes a buffer to a UDP socket.
Expand Down Expand Up @@ -436,24 +413,18 @@ impl<const N: usize> InetStack<N> {
// We just assert 'size' here, because it was previously checked at PDPIX layer.
debug_assert!(size.is_none() || ((size.unwrap() > 0) && (size.unwrap() <= limits::POP_SIZE_MAX)));

let (task_id, coroutine): (String, Pin<Box<Operation>>) = match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => {
let task_id: String = format!("Inetstack::TCP::pop for qd={:?}", qd);
let coroutine: Pin<Box<Operation>> = self.ipv4.tcp.pop(qd, size)?;
(task_id, coroutine)
},
match self.runtime.get_queue_type(&qd)? {
QType::TcpSocket => self.ipv4.tcp.pop(qd, size),
QType::UdpSocket => {
let task_id: String = format!("Inetstack::UDP::pop for qd={:?}", qd);
let coroutine: Pin<Box<Operation>> = self.ipv4.udp.pop(qd, size)?;
(task_id, coroutine)
let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("async_close() qt={:?}", qt);
Ok(qt)
},
_ => return Err(Fail::new(libc::EINVAL, "invalid queue type")),
};

let handle: TaskHandle = self.runtime.insert_coroutine(task_id.as_str(), coroutine)?;
let qt: QToken = handle.get_task_id().into();
trace!("pop() qt={:?}", qt);
Ok(qt)
}
}

/// Waits for an operation to complete.
Expand Down
2 changes: 1 addition & 1 deletion src/rust/inetstack/protocols/tcp/passive_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<const N: usize> SharedPassiveSocket<N> {
self.local
}

pub async fn accept(&mut self, yielder: Yielder) -> Result<EstablishedSocket<N>, Fail> {
pub async fn do_accept(&mut self, yielder: Yielder) -> Result<EstablishedSocket<N>, Fail> {
self.ready.pop(yielder).await
}

Expand Down
Loading