-
Thank you to the developers of I have an embedded Linux UDP server I'm trying to use with pub async fn main_loop(self) -> Result<(), Error> {
let default_addr: SocketAddr = "0.0.0.0:12000".parse().unwrap();
let socket = UdpSocket::bind(self.addr.unwrap_or(default_addr)).await?;
let mut buf = vec![0u8; 1024];
let ex = Rc::new(LocalExecutor::new());
let mut dispatch = Dispatch::<Context, Error, 16>::new(Context::new(&ex, socket.clone()));
dispatch.add_handler::<EchoEndpoint>(echo_handler).map_err(|s| Error::Init(s))?;
loop {
let (n, addr) = socket.recv_from(&mut buf).await?;
dispatch.context().addr = Some(addr);
match dispatch.dispatch(&mut buf[..n]) {
Ok(_) => {},
Err(e) => { todo!("Need to handle error: {:?}", e) },
}
}
} The overview of the RPC crate I'm using to provide fn echo_handler<'ex, 'b>(hdr: &WireHeader, ctx: &mut Context<'ex, 'b>, bytes: &[u8]) -> Result<(), Error> {
let Context { ex, sock, addr , .. } = ctx;
let addr = addr.unwrap().clone();
let sock = sock.clone();
let ex = Rc::clone(ex);
// let ctx = ctx.clone();
match postcard::from_bytes::<Echo>(bytes) {
Ok(msg) => {
ctx.ex.spawn(echo_task(ex, hdr.seq_no, hdr.key, (sock, addr), msg.0)).detach();
ctx.ex.try_tick();
Ok(())
},
Err(e) => {
Err(Error::Parse(e))
}
}
}
async fn echo_task<'a, 'ex>(_ex: Rc<LocalExecutor<'ex>>, seq_no: u32, key: Key, (sock, addr): (UdpSocket, SocketAddr), msg: String) {
let resp = EchoResponse(msg.to_uppercase());
let mut buf = vec![0u8; 1024];
if let Ok(used) = postcard_rpc::headered::to_slice_keyed(seq_no, key, &resp, &mut buf) {
let _ = sock.send_to(used, addr).await;
}
}
What I've noticed is that unless I immediately tell the Is |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
Tasks do not run except inside of a scheduler. There are many reasons for this, such as the fact that blocking futures need to run on a blocking executor, and polling the blocking futures on the spawning thread would effectively deadlock the thread. Generally if you spawn something in a task you don't want to block the current future on it. Also it's generally a bad idea to use |
Beta Was this translation helpful? Give feedback.
-
EDIT: I think what I'm expecting to happen is that "when my Now that I wrote that out, it occurs to me that this is probably unreasonable to expect without somehow notifying the executor "Hi, I have a new task, plz run it!". If I want a single-threaded executor, what is the correct way to spawn tasks from a Original Minddump Follows
Indeed, I don't want to want to block the current future from running. What I want is:
Without
|
Beta Was this translation helpful? Give feedback.
-
@notgull is correct re: "use run". My original main, which I didn't think to include, looked like this: fn main() {
let server = Server::new();
smol::block_on(server.main_loop());
} It should've looked like this: fn main() {
let server = Server::new();
let ex = Rc::new(LocalExecutor::new());
smol::block_on(ex.run(server.main_loop(ex.clone())));
} I.e. "I need to pass a reference to the The remaining code is also basically identical to above, except accommodating the passed-down pub async fn main_loop(self, ex: Rc<LocalExecutor<'_>>) -> Result<(), Error> {
let default_addr: SocketAddr = "0.0.0.0:12000".parse().unwrap();
let socket = UdpSocket::bind(self.addr.unwrap_or(default_addr)).await?;
let mut buf = vec![0u8; 1024];
let mut dispatch = Dispatch::<Context, Error, 16>::new(Context::new(&ex, socket.clone()));
dispatch.add_handler::<EchoEndpoint>(echo_handler).map_err(|s| Error::Init(s))?;
loop {
let (n, addr) = socket.recv_from(&mut buf).await?;
dispatch.context().addr = Some(addr);
match dispatch.dispatch(&mut buf[..n]) {
Ok(_) => {},
Err(e) => { todo!("Need to handle error: {:?}", e) },
}
}
} fn echo_handler<'ex, 'b>(hdr: &WireHeader, ctx: &mut Context<'ex, 'b>, bytes: &[u8]) -> Result<(), Error> {
let Context { ex, sock, addr , .. } = ctx;
let addr = addr.unwrap().clone();
let sock = sock.clone();
// let ctx = ctx.clone();
match postcard::from_bytes::<Echo>(bytes) {
Ok(msg) => {
ex.spawn(echo_task(ex.clone(), hdr.seq_no, hdr.key, (sock, addr), msg.0)).detach();
Ok(())
},
Err(e) => {
Err(Error::Parse(e))
}
}
} async fn echo_task<'a, 'ex>(_ex: Rc<LocalExecutor<'ex>>, seq_no: u32, key: Key, (sock, addr): (UdpSocket, SocketAddr), msg: String) {
let resp = EchoResponse(msg.to_uppercase());
let mut buf = vec![0u8; 1024];
if let Ok(used) = postcard_rpc::headered::to_slice_keyed(seq_no, key, &resp, &mut buf) {
let _ = sock.send_to(used, addr).await;
}
} My original Also credit to this comment for pointing out the issue once I stared at the code snippet long enough :). |
Beta Was this translation helpful? Give feedback.
Tasks do not run except inside of a scheduler. There are many reasons for this, such as the fact that blocking futures need to run on a blocking executor, and polling the blocking futures on the spawning thread would effectively deadlock the thread. Generally if you spawn something in a task you don't want to block the current future on it.
Also it's generally a bad idea to use
try_tick
, andrun()
should be used in most cases. See smol-rs/async-executor#88