diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index d4ebad7d684..e48925b497e 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -825,17 +825,14 @@ impl Sender { /// let (tx, mut rx1) = broadcast::channel::(16); /// let mut rx2 = tx.subscribe(); /// - /// tokio::spawn(async move { - /// assert_eq!(rx1.recv().await.unwrap(), 10); - /// }); - /// /// let _ = tx.send(10); - /// assert!(tx.closed().now_or_never().is_none()); /// - /// let _ = tokio::spawn(async move { - /// assert_eq!(rx2.recv().await.unwrap(), 10); - /// }).await; + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// drop(rx1); + /// assert!(tx.closed().now_or_never().is_none()); /// + /// assert_eq!(rx2.recv().await.unwrap(), 10); + /// drop(rx2); /// assert!(tx.closed().now_or_never().is_some()); /// } /// ``` diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 1167c298830..ca70c20686b 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -490,10 +490,34 @@ impl Drop for Rx { self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; + struct Guard<'a, T, S: Semaphore> { + list: &'a mut list::Rx, + tx: &'a list::Tx, + sem: &'a S, + } + + impl<'a, T, S: Semaphore> Guard<'a, T, S> { + fn drain(&mut self) { + // call T's destructor. + while let Some(Value(_)) = self.list.pop(self.tx) { + self.sem.add_permit(); + } + } + } - while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { - self.inner.semaphore.add_permit(); + impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> { + fn drop(&mut self) { + self.drain(); + } } + + let mut guard = Guard { + list: &mut rx_fields.list, + tx: &self.inner.tx, + sem: &self.inner.semaphore, + }; + + guard.drain(); }); } } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 638ced588ce..577e9c35faa 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1454,4 +1454,50 @@ async fn test_is_empty_32_msgs() { } } +#[test] +#[cfg(not(panic = "abort"))] +fn drop_all_elements_during_panic() { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::Relaxed; + use tokio::sync::mpsc::UnboundedReceiver; + use tokio::sync::mpsc::UnboundedSender; + + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + struct A(bool); + impl Drop for A { + // cause a panic when inner value is `true`. + fn drop(&mut self) { + COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if self.0 { + panic!("panic!") + } + } + } + + fn func(tx: UnboundedSender, rx: UnboundedReceiver) { + tx.send(A(true)).unwrap(); + tx.send(A(false)).unwrap(); + tx.send(A(false)).unwrap(); + + drop(rx); + + // `mpsc::Rx`'s drop is called and gets panicked while dropping the first value, + // but will keep dropping following elements. + } + + let (tx, rx) = mpsc::unbounded_channel(); + + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + func(tx.clone(), rx); + })); + + // all A's destructor should be called at this point, even before `mpsc::Chan`'s + // drop gets called. + assert_eq!(COUNTER.load(Relaxed), 3); + + drop(tx); + // `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation. +} + fn is_debug(_: &T) {}