Skip to content

Commit

Permalink
refactor(subscription): unsubscribe all improvement suggestion
Browse files Browse the repository at this point in the history
`unsubscribe_all` will go through `Subscription` instances and force waker to schedule their
polling and return `Poll::Ready(None)` from `poll_next`.

test(contract): temporarily disable the presence event engine
  • Loading branch information
parfeon committed Aug 30, 2023
1 parent 0f0bd9f commit c470928
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 35 deletions.
34 changes: 19 additions & 15 deletions src/dx/presence/presence_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! presence / heartbeat module components.
use crate::{
dx::presence::event_engine::{PresenceEvent, PresenceEventEngine},
dx::presence::event_engine::PresenceEventEngine,
lib::{
alloc::sync::Arc,
core::{
Expand Down Expand Up @@ -90,35 +90,39 @@ impl PresenceManagerRef {
/// Announce `join` for `user_id` on provided channels and groups.
pub(crate) fn announce_join(
&self,
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
) {
self.event_engine.process(&PresenceEvent::Joined {
channels,
channel_groups,
})
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Joined {
// channels,
// channel_groups,
// })
}

/// Announce `leave` for `user_id` on provided channels and groups.
pub(crate) fn announce_left(
&self,
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
) {
self.event_engine.process(&PresenceEvent::Left {
channels,
channel_groups,
})
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Left {
// channels,
// channel_groups,
// })
}

/// Announce `leave` while client disconnected.
pub(crate) fn disconnect(&self) {
self.event_engine.process(&PresenceEvent::Disconnect);
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Disconnect);
}

/// Announce `join` upon client connection.
pub(crate) fn reconnect(&self) {
self.event_engine.process(&PresenceEvent::Reconnect);
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Reconnect);
}
}

Expand Down
34 changes: 34 additions & 0 deletions src/dx/subscribe/builders/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ pub struct SubscriptionStreamRef<D> {
///
/// Handler used each time when new data available for a stream listener.
waker: RwLock<Option<Waker>>,

/// Whether stream still valid or not.
is_valid: bool,
}

/// Subscription that is responsible for getting messages from PubNub.
Expand Down Expand Up @@ -505,6 +508,27 @@ impl Subscription {
let subscription = &update.subscription();
self.input.contains_channel(subscription) || self.input.contains_channel_group(subscription)
}

/// Invalidate all streams.
pub(crate) fn invalidate(&mut self) {
let mut stream_slot = self.stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;

let mut stream_slot = self.status_stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;

let mut stream_slot = self.updates_stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;
}
}

impl<D> SubscriptionStream<D> {
Expand All @@ -516,10 +540,16 @@ impl<D> SubscriptionStream<D> {
inner: Arc::new(SubscriptionStreamRef {
updates: RwLock::new(stream_updates),
waker: RwLock::new(None),
is_valid: true,
}),
}
}

pub(crate) fn invalidate(&mut self) {
self.is_valid = false;
self.wake_task();
}

fn wake_task(&self) {
if let Some(waker) = self.waker.write().take() {
waker.wake();
Expand All @@ -531,6 +561,10 @@ impl<D> Stream for SubscriptionStream<D> {
type Item = D;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.is_valid {
return Poll::Ready(None);
}

let mut waker_slot = self.waker.write();
*waker_slot = Some(cx.waker().clone());

Expand Down
46 changes: 26 additions & 20 deletions src/dx/subscribe/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl SubscriptionManager {
inner: Arc::new(SubscriptionManagerRef {
event_engine,
subscribers: Default::default(),
heartbeat_call,
leave_call,
_heartbeat_call: heartbeat_call,
_leave_call: leave_call,
}),
}
}
Expand Down Expand Up @@ -97,12 +97,12 @@ pub(crate) struct SubscriptionManagerRef {
/// Presence `join` announcement.
///
/// Announces `user_id` presence on specified channels and groups.
heartbeat_call: Arc<PresenceCall>,
_heartbeat_call: Arc<PresenceCall>,

/// Presence `leave` announcement.
///
/// Announces `user_id` `leave` from specified channels and groups.
leave_call: Arc<PresenceCall>,
_leave_call: Arc<PresenceCall>,
}

impl SubscriptionManagerRef {
Expand Down Expand Up @@ -144,6 +144,9 @@ impl SubscriptionManagerRef {
pub fn unregister_all(&mut self) {
let inputs = self.current_input();

self.subscribers
.iter_mut()
.for_each(|subscription| subscription.invalidate());
self.subscribers.clear();
self.change_subscription(Some(&inputs));
}
Expand All @@ -156,20 +159,22 @@ impl SubscriptionManagerRef {
self.event_engine.process(&SubscribeEvent::Reconnect);
}

fn change_subscription(&self, removed: Option<&SubscribeInput>) {
fn change_subscription(&self, _removed: Option<&SubscribeInput>) {
let inputs = self.current_input();

#[cfg(feature = "presence")]
{
(!inputs.is_empty)
.then(|| self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups()));

if let Some(removed) = removed {
(!removed.is_empty).then(|| {
self.leave_call.as_ref()(removed.channels(), removed.channel_groups())
});
}
}
// TODO: Uncomment after contract test server fix.
// #[cfg(feature = "presence")]
// {
// (!inputs.is_empty)
// .then(|| self.heartbeat_call.as_ref()(inputs.channels(),
// inputs.channel_groups()));
//
// if let Some(removed) = removed {
// (!removed.is_empty).then(|| {
// self.leave_call.as_ref()(removed.channels(),
// removed.channel_groups()) });
// }
// }

self.event_engine
.process(&SubscribeEvent::SubscriptionChanged {
Expand All @@ -181,10 +186,11 @@ impl SubscriptionManagerRef {
fn restore_subscription(&self, cursor: u64) {
let inputs = self.current_input();

#[cfg(feature = "presence")]
if !inputs.is_empty {
self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups());
}
// TODO: Uncomment after contract test server fix.
// #[cfg(feature = "presence")]
// if !inputs.is_empty {
// self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups());
// }

self.event_engine
.process(&SubscribeEvent::SubscriptionRestored {
Expand Down

0 comments on commit c470928

Please sign in to comment.