Skip to content

Commit

Permalink
修复callback异常
Browse files Browse the repository at this point in the history
  • Loading branch information
BenLocal committed Jul 4, 2024
1 parent f1d19d3 commit d843f6f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 18 deletions.
7 changes: 4 additions & 3 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ async fn proxy_pull_worker(source: &str, app: &str, stream: &str, cancel: Cancel
.stream(stream)
.add_option("rtp_type", "0")
.build();
let (tx, rx) = tokio::sync::oneshot::channel::<String>();
let poll_cancel = CancellationToken::new();
let poll_cancel_clone = poll_cancel.clone();
player.on_close(move |_, _, _| {
let _ = tx.send(String::from(""));
poll_cancel_clone.cancel();
});
player.play(&source);

Expand All @@ -140,7 +141,7 @@ async fn proxy_pull_worker(source: &str, app: &str, stream: &str, cancel: Cancel
_ = cancel.cancelled() => {
break;
},
_ = rx => {
_ = poll_cancel.cancelled() => {
// todo retry
break;
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ macro_rules! box_to_mut_void_ptr {
($a:ident) => {
Box::into_raw(Box::new($a)) as *mut _
};
($a:expr) => {
Box::into_raw(Box::new($a)) as *mut _
};
}
14 changes: 10 additions & 4 deletions src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ impl ProxyPlayer {
unsafe { mk_proxy_player_total_reader_count(self.0) }
}

pub fn on_close(&self, cb: impl FnOnce(i32, &str, i32) + 'static) {
let cb = Box::new(cb);
pub fn on_close<T>(&self, cb: T)
where
T: FnMut(i32, String, i32) + 'static,
{
self.on_close_inner(Box::new(cb))
}

fn on_close_inner(&self, cb: OnCloseCallbackFn) {
unsafe {
mk_proxy_player_set_on_close(
self.0,
Expand Down Expand Up @@ -130,7 +136,7 @@ impl ProxyPlayerBuilder {
}
}

pub type OnCloseCallbackFn = Box<dyn FnMut(i32, &str, i32) + 'static>;
pub type OnCloseCallbackFn = Box<dyn FnMut(i32, String, i32) + 'static>;
extern "C" fn proxy_player_on_close(
user_data: *mut ::std::os::raw::c_void,
err: ::std::os::raw::c_int,
Expand All @@ -139,6 +145,6 @@ extern "C" fn proxy_player_on_close(
) {
unsafe {
let cb: &mut OnCloseCallbackFn = std::mem::transmute(user_data);
cb(err, const_ptr_to_string!(what).as_str(), sys_err);
cb(err, const_ptr_to_string!(what), sys_err);
};
}
22 changes: 18 additions & 4 deletions src/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,25 @@ impl Pusher {
unsafe { mk_pusher_publish(self.0, url.as_ptr()) }
}

pub fn on_result(&self, cb: OnEventCallbackFn) {
pub fn on_result<T>(&self, cb: T)
where
T: FnMut(i32, String) + 'static,
{
self.on_result_inner(Box::new(cb))
}

fn on_result_inner(&self, cb: OnEventCallbackFn) {
unsafe { mk_pusher_set_on_result(self.0, Some(on_push_event), box_to_mut_void_ptr!(cb)) }
}

pub fn on_shutdown(&self, cb: OnEventCallbackFn) {
pub fn on_shutdown<T>(&self, cb: T)
where
T: FnMut(i32, String) + 'static,
{
self.on_shutdown_inner(Box::new(cb))
}

pub fn on_shutdown_inner(&self, cb: OnEventCallbackFn) {
unsafe { mk_pusher_set_on_result(self.0, Some(on_push_event), box_to_mut_void_ptr!(cb)) }
}
}
Expand Down Expand Up @@ -93,14 +107,14 @@ impl PusherBuilder {
}
}

pub type OnEventCallbackFn = Box<dyn FnMut(i32, &str) + 'static>;
pub type OnEventCallbackFn = Box<dyn FnMut(i32, String) + 'static>;
extern "C" fn on_push_event(
user_data: *mut ::std::os::raw::c_void,
err_code: ::std::os::raw::c_int,
err_msg: *const ::std::os::raw::c_char,
) {
unsafe {
let cb: &mut OnEventCallbackFn = std::mem::transmute(user_data);
cb(err_code, const_ptr_to_string!(err_msg).as_str());
cb(err_code, const_ptr_to_string!(err_msg));
};
}
28 changes: 21 additions & 7 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ impl RtpServer {
unsafe { mk_rtp_server_port(self.0) }
}

pub fn on_detach(&self, cb: impl FnOnce() + 'static) {
let cb = Box::new(cb);
pub fn on_detach<T>(&self, cb: T)
where
T: FnMut() + 'static,
{
self.on_detach_inner(Box::new(cb));
}

fn on_detach_inner(&self, cb: OnRtpServerDetachCallbackFn) {
unsafe {
mk_rtp_server_set_on_detach(
self.0,
Expand All @@ -61,8 +67,14 @@ impl RtpServer {
}
}

pub fn connect(&self, url: &str, dst_port: u16, cb: impl FnOnce(i32, &str, i32) + 'static) {
let cb = Box::new(cb);
pub fn connect<T>(&self, url: &str, dst_port: u16, cb: T)
where
T: FnMut(i32, String, i32) + 'static,
{
self.connect_inner(url, dst_port, Box::new(cb));
}

fn connect_inner(&self, url: &str, dst_port: u16, cb: OnRtpServerConnectedCallbackFn) {
let url = const_str_to_ptr!(url);
unsafe {
mk_rtp_server_connect(
Expand All @@ -76,22 +88,24 @@ impl RtpServer {
}
}

type OnRtpServerDetachCallbackFn = Box<dyn FnMut() + 'static>;
extern "C" fn on_rtp_server_detach(user_data: *mut ::std::os::raw::c_void) {
unsafe {
let cb: &mut Box<dyn FnMut() + 'static> = std::mem::transmute(user_data);
let cb: &mut OnRtpServerDetachCallbackFn = std::mem::transmute(user_data);
cb();
};
}

type OnRtpServerConnectedCallbackFn = Box<dyn FnMut(i32, String, i32) + 'static>;
extern "C" fn on_rtp_server_connected(
user_data: *mut ::std::os::raw::c_void,
err: ::std::os::raw::c_int,
what: *const ::std::os::raw::c_char,
sys_err: ::std::os::raw::c_int,
) {
unsafe {
let cb: &mut Box<dyn FnMut(i32, &str, i32) + 'static> = std::mem::transmute(user_data);
cb(err, const_ptr_to_string!(what).as_str(), sys_err);
let cb: &mut OnRtpServerConnectedCallbackFn = std::mem::transmute(user_data);
cb(err, const_ptr_to_string!(what), sys_err);
};
}

Expand Down

0 comments on commit d843f6f

Please sign in to comment.