Skip to content

Migrate to async traits for handling event callbacks #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
w-utter opened this issue Jan 1, 2024 · 2 comments
Open

Migrate to async traits for handling event callbacks #522

w-utter opened this issue Jan 1, 2024 · 2 comments

Comments

@w-utter
Copy link

w-utter commented Jan 1, 2024

With the stabilization of async fn in trait as of 1.75, I think it would be more ergonomic to have a single state to handle all events which implements a trait rather than having to register each callback individually, and would also reduce heap allocation. Default implementations for the trait can also allow the same functionality as before, as they all return impl Future<Output = ()>.

Below is a basic example of what I'm suggesting (keep in mind I have not tested any of the code below):

trait PeerConnectionEventHandle {

    async fn on_ice_connection_state_change(&mut self, connection_state: RTCIceConnectionState) { }

    async fn on_signaling_state_change(&mut self, signaling_state: RTCSignalingState) { }

    async fn on_data_channel(&mut self, data_channel: Arc<RTCDataChannel>) { }

    async fn on_neogotiation_needed(&mut self) { }

    async fn on_ice_candidate(&mut self, ice_candidate: Option<RTCIceCandidate>) { }

    async fn on_ice_gathering_state_change(&mut self, gatherer_state: RTCIceGathererState) { }

    async fn on_track(&mut self, track: Arc<TrackRemote>, reciever: Arc<RTCRtpReceiver>, transceiver: Arc<RTCRtpTransceiver>) { }

    async fn on_peer_connection_state_change(&mut self, peer_connection_state: RTCPeerConnectionState) { }

}

changing the implementation to something like:

struct PeerConnectionInternal {
    /** keeping all other fields as before except removing the other event handlers **/
    event_handlers: Arc<ArcSwapOption<Mutex<dyn PeerConnectionEventHandle>>>,
}

impl RTCPeerConnection {
    /** keeping all other fns as before except removing the on_* fns **/
    fn with_handler(&self, handler: impl PeerConnectionEventHandle) {
       self.internal.event_handlers.store(Some(Arc::new(Mutex::new(handler))))
    }
}

and would change the play-from-disk-h264 example to something like:

struct PeerHandler {
    done_tx: tokio::sync::mpsc::Sender<()>,
    notify_tx: Arc<Notify>,
}

impl PeerConnectionEventHandle for PeerHandler {
    // Set the handler for Peer connection state
    // This will notify you when the peer has connected/disconnected
    async fn on_peer_connection_state_change(&mut self, peer_connection_state: RTCPeerConnectionState) {
        println!("Peer Connection State has changed: {peer_connection_state}");
        
        if peer_connection_state == RTCPeerConnectionState::Failed {
            // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
            // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
            // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
            println!("Peer Connection has gone to failed exiting");
            let _ = self.done_tx.try_send(());
        }
    }
    
    // Set the handler for ICE connection state
    // This will notify you when the peer has connected/disconnected
    async fn on_ice_connection_state_change(&mut self, connection_state: RTCIceConnectionState) {
        println!("Connection State has changed {connection_state}");
        if connection_state == RTCIceConnectionState::Connected {
            notify_tx.notify_waiters();
        }
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    /** everything above is the same **/
    let notify_tx = Arc::new(Notify::new());
    let notify_video = notify_tx.clone();
    let notify_audio = notify_tx.clone();

    let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
    let video_done_tx = done_tx.clone();
    let audio_done_tx = done_tx.clone();

    // Create a new RTCPeerConnection
    let peer_connection = Arc::new(api.new_peer_connection(config).await?.with_handler(PeerHandler { notify_tx, done_tx } ));
    /** everything below is the same except removing the on_* callbacks **/
}

I'll follow this up with a pr, but with the scope of how much needs to be changed it might take awhile. Also, as it is a very new feature, I wanted to see if there would be any discussion.

@w-utter
Copy link
Author

w-utter commented Jan 5, 2024

As I've been working on this, I realized that the each fn in the trait has to return impl Future<Output = ()> + Send since were dealing with multiple threads, and which cannot be represented just as an async fn. Regardless, RPITIT fns are just desugared async fns which are also supported by 1.75, the fn signature just doesn't look as clean.

@Kleptine
Copy link

Kleptine commented Apr 28, 2025

I was looking into something similar the last day or two. Did anything come of your work here?

I like the trait approach, here, actually. I have two other alternatives.

  1. Pass 'static async closures to each handler.

This is the simplest option, and can actually be implemented as a wrapper around the peer. I've done this for the APIs I use in my project.

pub struct AsyncRTCPeer {
    pub peer: RTCPeerConnection,
}

pub fn on_track<X, Fut>(&self, mut handler: X)
    where
        X: FnMut(Arc<TrackRemote>, Arc<RTCRtpReceiver>, Arc<RTCRtpTransceiver>) -> Fut
            + Send
            + Sync
            + 'static,
        Fut: Future<Output = ()> + Send + 'static,
{
    self.on_track(Box::new(move |a, b, c| {
        let fut = handler(a, b, c).instrument(info_span!("Peer"));
        Box::pin(async move { fut.await })
    }));
}

// Usage:
peer.on_track(async |_track, _, _| {
        info!("New track added: {:?}", ss.track_id);
});

This is fairly clean, but has the major drawback that every event callback must be 'static! This is really a limitation of the underlying webrtc-rs implementation, because it ends up calling tokio::spawn with each future and driving them manually.

  1. Turn peer into a 'driven' future.

Currently webrtc-rs owns and pumps all of the futures that handle events. Most frameworks like Axum let the user handle this responsibility. ie:

let app = Router::new().route("/", get(handler));
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
    .await
    .unwrap();
axum::serve(listener, app).await.unwrap();

It might be a nicer interface to allow the peer to be managed and pumped by the user, which would allow for much more control over the lifetime of the peer, general execution, etc.

This might look something like:

let peer = api.peer()
    .on_track(async |track, _, _| { info!("Got a track!"); })
    .on_connection_changed(async |track, _, _| { info!("Connection changed."); })
    .build();

let result: Result((), PeerError) = peer.run().await;

Crucially, I believe this would allow removing the 'static bounds from callbacks, letting closures capture references to their environment. ie:

let trackNum = AtomicI8::new();
let peer = api.peer()
    .on_track(async |track, _, _| { info!("Got a track! [{}]", &trackNum.fetch_add(1, SeqCst)); })
    .build();

let result: Result((), PeerError) = peer.run().await;

Axum uses a Service layer trait to encapsulate the event handling when building in this way. In that way, it'd actually be pretty similar to your PeerConnectionEventHandle trait!

But this would be a much more complex interface to design, and I'm not an expert at Rust async.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants