Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

netlink-proto: handle ENOBUFS errors #139

Open
little-dude opened this issue Jan 17, 2021 · 6 comments
Open

netlink-proto: handle ENOBUFS errors #139

little-dude opened this issue Jan 17, 2021 · 6 comments

Comments

@little-dude
Copy link
Owner

little-dude commented Jan 17, 2021

To quote the man(7):

However, reliable transmissions from kernel to user are impossible in any case. The kernel can't send a netlink message if the socket buffer is full: the message will be dropped and the kernel and the user-space process will no longer have the same view of kernel state. It is up to the application to detect when this happens (via the ENOBUFS error returned by recvmsg(2)) and resynchronize.

Currently NetlinkFramed will just Poll::Ready(None), which we interpret as the socket shutting down in Connection:

    pub fn poll_read_messages(&mut self, cx: &mut Context) {
        trace!("poll_read_messages called");
        let mut socket = Pin::new(&mut self.socket);

        loop {
            trace!("polling socket");
            match socket.as_mut().poll_next(cx) {
                Poll::Ready(Some((message, addr))) => {
                    trace!("read datagram from socket");
                    self.protocol.handle_message(message, addr);
                }
                Poll::Ready(None) => {
                    warn!("netlink socket stream shut down");
                    self.socket_closed = true;
                    return;
                }
                Poll::Pending => {
                    trace!("no datagram read from socket");
                    return;
                }
            }
        }
    }
@idanski
Copy link
Contributor

idanski commented Jan 18, 2021

I actually encountered this with audit recently!
So I'm very interested in this feature, was thinking of suggesting someway to handle it but didn't got around to it.

Let me know if I can help somehow. 🙏

@little-dude
Copy link
Owner Author

Oh, sorry that you ran into it. I think at the moment, the only way around it is not to use netlink-proto to be honest. I'm not exactly sure how I'm going to tackle this yet, because this issue also relates to the refactoring I mentioned in https://docs.rs/tokio/1.0.2/tokio/io/unix/struct.AsyncFdReadyGuard.html#method.clear_ready

Would you mind explaining in more detail what you're doing to trigger this? And if you were able to catch the ENOBUFS errors, how would you handle them, exactly?

@idanski
Copy link
Contributor

idanski commented Jan 19, 2021

sure no problem.

recreating this is not easy but I ran audit in multicast mode + enabled other products that listen on auditd.
on some machines the multicast socket sometimes received ENOBUFS and the stream was indeed consumed.

To test I forked your repo and ignored ENOBUFS, printing a log instead and I saw it was working well + logs were printed.
I thought maybe the correct way of handling it was either to pass error codes to ignore to NetlinkFramed or somehow return the error message from the stream (maybe as a wrapped Result<Message, NetlinkError>) I didn't really looked into it yet.

Again thanks for the awesome crate and let me know if I can help in any way.

@Tuetuopay
Copy link
Contributor

I so happen to be hitting this too, I'm working on working around it. If you need some way to reproduce it, it's very easy to trigger it by adding a lot of neighbours to a link, and deleting it:

use futures::stream::{StreamExt, TryStreamExt};
use rtnetlink::packet::{constants::NUD_INCOMPLETE, link::nlas::Nla};
use rtnetlink::{constants::RTMGRP_NEIGH, new_connection, sys::SocketAddr};
use std::net::Ipv4Addr;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (conn, rtnl, _) = new_connection().expect("connection failed");
    tokio::task::spawn(conn);

    // Cleanup potentially stale bridge device
    let mut req = rtnl.link().del(0);
    req.message_mut().nlas.push(Nla::IfName("br0".to_owned()));
    let _ = req.execute().await;

    // Create a bridge device
    rtnl.link().add().bridge("br0".to_owned()).execute().await.expect("failed to add br0");
    println!("created br0");

    let bridge = rtnl.link().get().set_name_filter("br0".to_owned()).execute().try_next().await
        .expect("failed to get br0")
        .expect("br0 not found");
    let bridge = bridge.header.index;

    // Add an IP on it
    rtnl.address().add(bridge, [169, 254, 0, 1].into(), 16).execute().await
        .expect("failed to add 169.254.0.1/16 to br0");

    // Add LOADS of neighbours on it
    for ip_end in 2..500 {
        let ip: Ipv4Addr = ((169 << 24) as u32 | (254 << 16 as u32) | ip_end).into();

        rtnl.neighbours().add(bridge, ip.into()).state(NUD_INCOMPLETE).execute().await
            .expect("failed to add neigh");
    }

    // Now subscribe to the neighbours multicast group, and spawn a (simple) handler task
    let (mut conn, _, mut stream) = new_connection().expect("connection failed");
    conn.socket_mut().bind(&SocketAddr::new(0, RTMGRP_NEIGH))
        .expect("failed to bind to neigh multicast group");

    tokio::task::spawn(async move {
        conn.await;
        println!("CONNECTION TASK EXITED!");
    });
    tokio::task::spawn(async move {
        while let Some((msg, _)) = stream.next().await {
            println!("Got msg {:?}", msg);
        }
    });

    // Now delete the bridge, which generates a LOT of neighbour notifications (all of them will be
    // deleted at once)
    rtnl.link().del(bridge).execute().await.expect("failed to delete br0");

    println!("press ^C to exit...");
    tokio::signal::ctrl_c().await.unwrap();
}

I'm using a single-threaded Tokio runtime to accentuate the effect, but it still triggers on the multi-threaded runtime (which actually helps since I'm spawing the conn futures in a new task).

The way I'm planning on working around is to bubble ENOBUF to the stream consumer (with results maybe, as suggested by @idanski), so the application can decide on what to do to handle such events. Usually, that means dumping back the kernel's contents to update some form of cache or resync some internal data; not something that can be easily done by the library*.

Application-wise, I think I'll add a second layer of streams to my app, with a proxy task that'll constantly reopen a socket when it happens to overflow, and send to my handlers "hey we did miss some messages, please resync!".

(*) actually there's a pretty schmick way to handle this, and would be a great addition, if not quite involved. That would be managed caches, like libnl does. The events sent to the applications over the channel are not directly from the kernel per se, but rather comes from the changes done to the cache. Either we get a message from the kernel, update the cache, and pass the event; or we get ENOBUF, dump the kernel, diff it with the cache, and send the changes to the stream.

@little-dude
Copy link
Owner Author

little-dude commented Dec 19, 2021

So I'm starting to think that the current behavior is not too horrible: the socket goes out of thing, so we close the connection. But yeah, we should at least give the option not to do that.

I think the caches technique is a bit complex to implement, and I'm afraid it'll be protocol dependent.

The two suggestions that remain are:

  • propagating the error through the stream of unsolicited message (or we could have a dedicated stream for it)
  • allow the users to configure the connection to ignore these messages

The two suggestions are actually complementary: we could make the connection configurable, with three different configs:

  • propagate the errors to the stream to let the user decide how they want to handle the error
  • ignore this error, perhaps just log a warning in the codec
  • close the connection (current behavior)

If we decide to ignore the error, perhaps we should at least drop the in-flights requests though, because it's possible that we've missed the response to one or several of them, and they'll remain forever in the connection as per #138

@Tuetuopay
Copy link
Contributor

The current behaviour is definitely workable, if inefficient. The way I'm currently working around this issue is by having my own intermediate task that forwards messages to my own stream, and that constantly reopens a socket when it "closes".

I do agree on the cache, it's a bit too involved. I think it can be done, but I don't have the bandwidth right now to do it. I only mentioned it as "in a perfect world we'd do this".

Those two options indeed complement themselves:

  • change the stream to yield Result<NetlinkMessage<_>, std::io::Error>
  • configure the connection to never yield Err(_), or, if we want finer-grained control, some kind of error mask to silently-ish (warn log) to prevent yielding specific kinds of errors. I didn't look at what non-fatal errors recvmsg can return, maybe that's completely useless and only ENOBUFS is of interest

I don't have the bandwidth to do this before the new year sadly, but I'll try to have a crack at it as soon as possible.

As for the in-flight request, that's why you should use two different sockets: one for request-response, and another for subscriptions. This avoids the whole issue altogether. I don't have a magic solution, but some way to enforce "use this connection either for req/res or for sub, but not both" might be a better way to do it. Maybe change a bit create_connection()? Something like:

fn create_connection() -> ConnectionBuilder;

impl ConnectionBuilder {
    fn connect() -> (impl Future<_>, Handle);
    fn subscribe(groups: u8) -> (impl Future<_>, UnboundedReceiver<_>);
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants