From b6f090bcb0827f8d8d9a987da8e3dc1d6778892e Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Fri, 7 Feb 2025 17:46:17 -0500 Subject: [PATCH] fix broadcast group stream bug (#1613) --- .../src/subscriptions/stream_conversations.rs | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/xmtp_mls/src/subscriptions/stream_conversations.rs b/xmtp_mls/src/subscriptions/stream_conversations.rs index 0256573e3..7e08a43dc 100644 --- a/xmtp_mls/src/subscriptions/stream_conversations.rs +++ b/xmtp_mls/src/subscriptions/stream_conversations.rs @@ -61,18 +61,23 @@ impl Stream for BroadcastGroupStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use std::task::Poll::*; - let this = self.project(); - if let Some(event) = ready!(this.inner.poll_next(cx)) { - if let Some(group) = - xmtp_common::optify!(event, "Missed messages due to event queue lag") - .and_then(LocalEvents::group_filter) - { - Ready(Some(Ok(WelcomeOrGroup::Group(group)))) + let mut this = self.project(); + // loop until the inner stream returns: + // - Ready with a group + // - Ready(None) - stream ended + // ignore None values, since it is not a group, but may indicate more values in the stream + // itself + loop { + if let Some(event) = ready!(this.inner.as_mut().poll_next(cx)) { + if let Some(group) = + xmtp_common::optify!(event, "Missed messages due to event queue lag") + .and_then(LocalEvents::group_filter) + { + return Ready(Some(Ok(WelcomeOrGroup::Group(group)))); + } } else { - Pending + return Ready(None); } - } else { - Ready(None) } } }