diff --git a/examples/server.rs b/examples/server.rs index 67ef971..96507f9 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -129,9 +129,10 @@ async fn proxy_pull_worker(source: &str, app: &str, stream: &str, cancel: Cancel .stream(stream) .add_option("rtp_type", "0") .build(); - let (tx, rx) = tokio::sync::oneshot::channel::(); + let poll_cancel = CancellationToken::new(); + let poll_cancel_clone = poll_cancel.clone(); player.on_close(move |_, _, _| { - let _ = tx.send(String::from("")); + poll_cancel_clone.cancel(); }); player.play(&source); @@ -140,7 +141,7 @@ async fn proxy_pull_worker(source: &str, app: &str, stream: &str, cancel: Cancel _ = cancel.cancelled() => { break; }, - _ = rx => { + _ = poll_cancel.cancelled() => { // todo retry break; } diff --git a/src/lib.rs b/src/lib.rs index 21851a8..b36a4c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,4 +40,7 @@ macro_rules! box_to_mut_void_ptr { ($a:ident) => { Box::into_raw(Box::new($a)) as *mut _ }; + ($a:expr) => { + Box::into_raw(Box::new($a)) as *mut _ + }; } diff --git a/src/player.rs b/src/player.rs index 36fa56c..f0185ba 100644 --- a/src/player.rs +++ b/src/player.rs @@ -28,8 +28,14 @@ impl ProxyPlayer { unsafe { mk_proxy_player_total_reader_count(self.0) } } - pub fn on_close(&self, cb: impl FnOnce(i32, &str, i32) + 'static) { - let cb = Box::new(cb); + pub fn on_close(&self, cb: T) + where + T: FnMut(i32, String, i32) + 'static, + { + self.on_close_inner(Box::new(cb)) + } + + fn on_close_inner(&self, cb: OnCloseCallbackFn) { unsafe { mk_proxy_player_set_on_close( self.0, @@ -130,7 +136,7 @@ impl ProxyPlayerBuilder { } } -pub type OnCloseCallbackFn = Box; +pub type OnCloseCallbackFn = Box; extern "C" fn proxy_player_on_close( user_data: *mut ::std::os::raw::c_void, err: ::std::os::raw::c_int, @@ -139,6 +145,6 @@ extern "C" fn proxy_player_on_close( ) { unsafe { let cb: &mut OnCloseCallbackFn = std::mem::transmute(user_data); - cb(err, const_ptr_to_string!(what).as_str(), sys_err); + cb(err, const_ptr_to_string!(what), sys_err); }; } diff --git a/src/pusher.rs b/src/pusher.rs index 17b96af..0751bfa 100644 --- a/src/pusher.rs +++ b/src/pusher.rs @@ -28,11 +28,25 @@ impl Pusher { unsafe { mk_pusher_publish(self.0, url.as_ptr()) } } - pub fn on_result(&self, cb: OnEventCallbackFn) { + pub fn on_result(&self, cb: T) + where + T: FnMut(i32, String) + 'static, + { + self.on_result_inner(Box::new(cb)) + } + + fn on_result_inner(&self, cb: OnEventCallbackFn) { unsafe { mk_pusher_set_on_result(self.0, Some(on_push_event), box_to_mut_void_ptr!(cb)) } } - pub fn on_shutdown(&self, cb: OnEventCallbackFn) { + pub fn on_shutdown(&self, cb: T) + where + T: FnMut(i32, String) + 'static, + { + self.on_shutdown_inner(Box::new(cb)) + } + + pub fn on_shutdown_inner(&self, cb: OnEventCallbackFn) { unsafe { mk_pusher_set_on_result(self.0, Some(on_push_event), box_to_mut_void_ptr!(cb)) } } } @@ -93,7 +107,7 @@ impl PusherBuilder { } } -pub type OnEventCallbackFn = Box; +pub type OnEventCallbackFn = Box; extern "C" fn on_push_event( user_data: *mut ::std::os::raw::c_void, err_code: ::std::os::raw::c_int, @@ -101,6 +115,6 @@ extern "C" fn on_push_event( ) { unsafe { let cb: &mut OnEventCallbackFn = std::mem::transmute(user_data); - cb(err_code, const_ptr_to_string!(err_msg).as_str()); + cb(err_code, const_ptr_to_string!(err_msg)); }; } diff --git a/src/server.rs b/src/server.rs index 3483a25..613b018 100644 --- a/src/server.rs +++ b/src/server.rs @@ -50,8 +50,14 @@ impl RtpServer { unsafe { mk_rtp_server_port(self.0) } } - pub fn on_detach(&self, cb: impl FnOnce() + 'static) { - let cb = Box::new(cb); + pub fn on_detach(&self, cb: T) + where + T: FnMut() + 'static, + { + self.on_detach_inner(Box::new(cb)); + } + + fn on_detach_inner(&self, cb: OnRtpServerDetachCallbackFn) { unsafe { mk_rtp_server_set_on_detach( self.0, @@ -61,8 +67,14 @@ impl RtpServer { } } - pub fn connect(&self, url: &str, dst_port: u16, cb: impl FnOnce(i32, &str, i32) + 'static) { - let cb = Box::new(cb); + pub fn connect(&self, url: &str, dst_port: u16, cb: T) + where + T: FnMut(i32, String, i32) + 'static, + { + self.connect_inner(url, dst_port, Box::new(cb)); + } + + fn connect_inner(&self, url: &str, dst_port: u16, cb: OnRtpServerConnectedCallbackFn) { let url = const_str_to_ptr!(url); unsafe { mk_rtp_server_connect( @@ -76,13 +88,15 @@ impl RtpServer { } } +type OnRtpServerDetachCallbackFn = Box; extern "C" fn on_rtp_server_detach(user_data: *mut ::std::os::raw::c_void) { unsafe { - let cb: &mut Box = std::mem::transmute(user_data); + let cb: &mut OnRtpServerDetachCallbackFn = std::mem::transmute(user_data); cb(); }; } +type OnRtpServerConnectedCallbackFn = Box; extern "C" fn on_rtp_server_connected( user_data: *mut ::std::os::raw::c_void, err: ::std::os::raw::c_int, @@ -90,8 +104,8 @@ extern "C" fn on_rtp_server_connected( sys_err: ::std::os::raw::c_int, ) { unsafe { - let cb: &mut Box = std::mem::transmute(user_data); - cb(err, const_ptr_to_string!(what).as_str(), sys_err); + let cb: &mut OnRtpServerConnectedCallbackFn = std::mem::transmute(user_data); + cb(err, const_ptr_to_string!(what), sys_err); }; }