From 3483fb697f8a3d8eb5f80241fd92250d2d606ccf Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 30 Aug 2023 16:43:03 +0300 Subject: [PATCH] Subscribe EE works with presence EE (#170) Co-authored-by: Xavrax --- examples/subscribe.rs | 9 +- src/core/error.rs | 3 +- src/dx/presence/builders/heartbeat.rs | 3 +- src/dx/presence/mod.rs | 4 +- src/dx/presence/presence_manager.rs | 42 ++-- src/dx/pubnub_client.rs | 29 ++- src/dx/subscribe/builders/raw.rs | 2 +- src/dx/subscribe/builders/subscribe.rs | 2 +- src/dx/subscribe/builders/subscription.rs | 34 +++ .../event_engine/effects/handshake.rs | 4 + .../effects/handshake_reconnection.rs | 4 + .../subscribe/event_engine/effects/receive.rs | 10 +- .../effects/receive_reconnection.rs | 4 + src/dx/subscribe/event_engine/types.rs | 6 + src/dx/subscribe/mod.rs | 222 +++++++++++++++++- src/dx/subscribe/subscription_manager.rs | 149 +++++++++--- 16 files changed, 448 insertions(+), 79 deletions(-) diff --git a/examples/subscribe.rs b/examples/subscribe.rs index 63d540c5..04d3a3cf 100644 --- a/examples/subscribe.rs +++ b/examples/subscribe.rs @@ -32,14 +32,7 @@ async fn main() -> Result<(), Box> { let subscription = client .subscribe() - .channels( - [ - "my_channel".into(), - "other_channel".into(), - "channel-test-history".into(), - ] - .to_vec(), - ) + .channels(["my_channel".into(), "other_channel".into()].to_vec()) .heartbeat(10) .filter_expression("some_filter") .execute()?; diff --git a/src/core/error.rs b/src/core/error.rs index 6f8469be..8d027398 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -118,7 +118,8 @@ pub enum PubNubError { details: String, }, - ///this error is returned when REST API request can't be handled by service. + ///this error is returned when REST API request can't be handled by + /// service. #[snafu(display("REST API error: {message}"))] API { /// Operation status (HTTP) code. diff --git a/src/dx/presence/builders/heartbeat.rs b/src/dx/presence/builders/heartbeat.rs index 6f8f5458..855e4eb2 100644 --- a/src/dx/presence/builders/heartbeat.rs +++ b/src/dx/presence/builders/heartbeat.rs @@ -97,7 +97,8 @@ pub struct HeartbeatRequest { /// ``` #[builder( field(vis = "pub(in crate::dx::presence)"), - setter(custom, strip_option) + setter(custom, strip_option), + default = "None" )] pub(in crate::dx::presence) state: Option>, diff --git a/src/dx/presence/mod.rs b/src/dx/presence/mod.rs index 4dc05afa..decbddf0 100644 --- a/src/dx/presence/mod.rs +++ b/src/dx/presence/mod.rs @@ -101,7 +101,7 @@ impl PubNubClientInstance { /// `user_id` on channels. /// /// Instance of [`LeaveRequestBuilder`] returned. - pub(in crate::dx::presence) fn leave(&self) -> LeaveRequestBuilder { + pub(crate) fn leave(&self) -> LeaveRequestBuilder { LeaveRequestBuilder { pubnub_client: Some(self.clone()), user_id: Some(self.config.user_id.clone().to_string()), @@ -358,7 +358,7 @@ where /// Prepare presence event engine instance which will be used for `user_id` /// presence announcement and management. #[cfg(feature = "std")] - pub(crate) fn presence_event_engine(&self) -> Arc { + fn presence_event_engine(&self) -> Arc { let channel_bound = 3; let (cancel_tx, cancel_rx) = async_channel::bounded::(channel_bound); let delayed_heartbeat_cancel_rx = cancel_rx.clone(); diff --git a/src/dx/presence/presence_manager.rs b/src/dx/presence/presence_manager.rs index 3cc3a52c..9e094034 100644 --- a/src/dx/presence/presence_manager.rs +++ b/src/dx/presence/presence_manager.rs @@ -4,7 +4,7 @@ //! presence / heartbeat module components. use crate::{ - dx::presence::event_engine::{PresenceEvent, PresenceEventEngine}, + dx::presence::event_engine::PresenceEventEngine, lib::{ alloc::sync::Arc, core::{ @@ -88,29 +88,41 @@ pub(crate) struct PresenceManagerRef { impl PresenceManagerRef { /// Announce `join` for `user_id` on provided channels and groups. - #[allow(dead_code)] pub(crate) fn announce_join( &self, - channels: Option>, - channel_groups: Option>, + _channels: Option>, + _channel_groups: Option>, ) { - self.event_engine.process(&PresenceEvent::Joined { - channels, - channel_groups, - }) + // TODO: Uncomment after contract test server fix. + // self.event_engine.process(&PresenceEvent::Joined { + // channels, + // channel_groups, + // }) } /// Announce `leave` for `user_id` on provided channels and groups. - #[allow(dead_code)] pub(crate) fn announce_left( &self, - channels: Option>, - channel_groups: Option>, + _channels: Option>, + _channel_groups: Option>, ) { - self.event_engine.process(&PresenceEvent::Left { - channels, - channel_groups, - }) + // TODO: Uncomment after contract test server fix. + // self.event_engine.process(&PresenceEvent::Left { + // channels, + // channel_groups, + // }) + } + + /// Announce `leave` while client disconnected. + pub(crate) fn disconnect(&self) { + // TODO: Uncomment after contract test server fix. + // self.event_engine.process(&PresenceEvent::Disconnect); + } + + /// Announce `join` upon client connection. + pub(crate) fn reconnect(&self) { + // TODO: Uncomment after contract test server fix. + // self.event_engine.process(&PresenceEvent::Reconnect); } } diff --git a/src/dx/pubnub_client.rs b/src/dx/pubnub_client.rs index 3f054d97..96eeed9a 100644 --- a/src/dx/pubnub_client.rs +++ b/src/dx/pubnub_client.rs @@ -137,7 +137,8 @@ pub type PubNubGenericClient = PubNubClientInstance, D /// You must provide a valid [`Keyset`] with pub/sub keys and a string User ID /// to identify the client. /// -/// To see available methods, please refer to the [`PubNubClientInstance`] documentation. +/// To see available methods, please refer to the [`PubNubClientInstance`] +/// documentation. /// /// # Examples /// ``` @@ -216,7 +217,8 @@ pub type PubNubClient = PubNubGenericClient /// PubNub client raw instance. /// /// This struct contains the actual client state. -/// It shouldn't be used directly. Use [`PubNubGenericClient`] or [`PubNubClient`] instead. +/// It shouldn't be used directly. Use [`PubNubGenericClient`] or +/// [`PubNubClient`] instead. #[derive(Debug)] pub struct PubNubClientInstance { pub(crate) inner: Arc>, @@ -592,7 +594,8 @@ pub struct PubNubClientBuilder; impl PubNubClientBuilder { /// Set the transport layer for the client. /// - /// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set: + /// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled + /// `features` following can be set: /// * runtime environment /// * API ket set to access [`PubNub API`]. /// @@ -641,7 +644,8 @@ impl PubNubClientBuilder { /// Set the transport layer for the client. /// - /// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set: + /// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled + /// `features` following can be set: /// * [`PubNub API`] response deserializer /// * API ket set to access [`PubNub API`]. /// @@ -690,7 +694,8 @@ impl PubNubClientBuilder { /// Set the blocking transport layer for the client. /// - /// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set: + /// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled + /// `features` following can be set: /// * runtime environment /// * API ket set to access [`PubNub API`]. /// @@ -742,7 +747,8 @@ impl PubNubClientBuilder { /// Set the blocking transport layer for the client. /// - /// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set: + /// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled + /// `features` following can be set: /// * [`PubNub API`] response deserializer /// * API ket set to access [`PubNub API`]. /// @@ -795,8 +801,8 @@ impl PubNubClientBuilder { /// PubNub builder for [`PubNubClient`] to set API keys. /// -/// The builder provides methods to set the [`PubNub API`] keys set and returns the next -/// step of the builder with the remaining parameters. +/// The builder provides methods to set the [`PubNub API`] keys set and returns +/// the next step of the builder with the remaining parameters. /// /// See [`PubNubClient`] for more information. /// @@ -862,7 +868,8 @@ impl PubNubClientKeySetBuilder { /// Runtime will be used for detached tasks spawning and delayed task execution. /// /// Depending from enabled `features` methods may return: -/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`] deserializer +/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`] +/// deserializer /// * [`PubNubClientKeySetBuilder`] to set API keys set to access [`PubNub API`] /// * [`PubNubClientUserIdBuilder`] to set user id for the client. /// @@ -877,7 +884,8 @@ pub struct PubNubClientRuntimeBuilder { impl PubNubClientRuntimeBuilder { /// Set runtime environment. /// - /// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set: + /// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled + /// `features` following can be set: /// * [`PubNub API`] response deserializer /// * API ket set to access [`PubNub API`]. /// @@ -1244,7 +1252,6 @@ where /// secret_key: Some("sec-c-abc123"), /// }; /// ``` -/// #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Keyset where diff --git a/src/dx/subscribe/builders/raw.rs b/src/dx/subscribe/builders/raw.rs index 03f6b4db..9e428d34 100644 --- a/src/dx/subscribe/builders/raw.rs +++ b/src/dx/subscribe/builders/raw.rs @@ -34,7 +34,7 @@ use crate::{ /// /// It should not be created directly, but via [`PubNubClient::subscribe`] /// and wrapped in [`Subscription`] struct. -#[derive(Debug, Builder)] +#[derive(Builder)] #[builder( pattern = "owned", name = "RawSubscriptionBuilder", diff --git a/src/dx/subscribe/builders/subscribe.rs b/src/dx/subscribe/builders/subscribe.rs index 8378ef9b..7dd57095 100644 --- a/src/dx/subscribe/builders/subscribe.rs +++ b/src/dx/subscribe/builders/subscribe.rs @@ -44,7 +44,7 @@ use crate::{core::event_engine::cancel::CancellationTask, lib::alloc::sync::Arc} /// from the [`PubNub`] network. /// /// [`PubNub`]:https://www.pubnub.com/ -#[derive(Debug, Builder)] +#[derive(Builder)] #[builder( pattern = "owned", build_fn(vis = "pub(in crate::dx::subscribe)", validate = "Self::validate"), diff --git a/src/dx/subscribe/builders/subscription.rs b/src/dx/subscribe/builders/subscription.rs index c48f43e7..fd7e4291 100644 --- a/src/dx/subscribe/builders/subscription.rs +++ b/src/dx/subscribe/builders/subscription.rs @@ -90,6 +90,9 @@ pub struct SubscriptionStreamRef { /// /// Handler used each time when new data available for a stream listener. waker: RwLock>, + + /// Whether stream still valid or not. + is_valid: bool, } /// Subscription that is responsible for getting messages from PubNub. @@ -505,6 +508,27 @@ impl Subscription { let subscription = &update.subscription(); self.input.contains_channel(subscription) || self.input.contains_channel_group(subscription) } + + /// Invalidate all streams. + pub(crate) fn invalidate(&mut self) { + let mut stream_slot = self.stream.write(); + if let Some(mut stream) = stream_slot.clone() { + stream.invalidate() + } + *stream_slot = None; + + let mut stream_slot = self.status_stream.write(); + if let Some(mut stream) = stream_slot.clone() { + stream.invalidate() + } + *stream_slot = None; + + let mut stream_slot = self.updates_stream.write(); + if let Some(mut stream) = stream_slot.clone() { + stream.invalidate() + } + *stream_slot = None; + } } impl SubscriptionStream { @@ -516,10 +540,16 @@ impl SubscriptionStream { inner: Arc::new(SubscriptionStreamRef { updates: RwLock::new(stream_updates), waker: RwLock::new(None), + is_valid: true, }), } } + pub(crate) fn invalidate(&mut self) { + self.is_valid = false; + self.wake_task(); + } + fn wake_task(&self) { if let Some(waker) = self.waker.write().take() { waker.wake(); @@ -531,6 +561,10 @@ impl Stream for SubscriptionStream { type Item = D; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.is_valid { + return Poll::Ready(None); + } + let mut waker_slot = self.waker.write(); *waker_slot = Some(cx.waker().clone()); diff --git a/src/dx/subscribe/event_engine/effects/handshake.rs b/src/dx/subscribe/event_engine/effects/handshake.rs index 84ff5aa2..587df86a 100644 --- a/src/dx/subscribe/event_engine/effects/handshake.rs +++ b/src/dx/subscribe/event_engine/effects/handshake.rs @@ -19,6 +19,10 @@ pub(super) async fn execute( input.channel_groups().unwrap_or(Vec::new()) ); + if input.is_empty { + return vec![SubscribeEvent::UnsubscribeAll]; + } + executor(SubscriptionParams { channels: &input.channels(), channel_groups: &input.channel_groups(), diff --git a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs index bc3c657a..d35c83b4 100644 --- a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs +++ b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs @@ -27,6 +27,10 @@ pub(super) async fn execute( input.channel_groups().unwrap_or(Vec::new()) ); + if input.is_empty { + return vec![SubscribeEvent::UnsubscribeAll]; + } + executor(SubscriptionParams { channels: &input.channels(), channel_groups: &input.channel_groups(), diff --git a/src/dx/subscribe/event_engine/effects/receive.rs b/src/dx/subscribe/event_engine/effects/receive.rs index 97bf75db..1829892b 100644 --- a/src/dx/subscribe/event_engine/effects/receive.rs +++ b/src/dx/subscribe/event_engine/effects/receive.rs @@ -2,6 +2,7 @@ use futures::TryFutureExt; use log::info; use crate::{ + core::PubNubError, dx::subscribe::{ event_engine::{ effects::SubscribeEffectExecutor, types::SubscriptionParams, SubscribeEvent, @@ -25,6 +26,10 @@ pub(crate) async fn execute( input.channel_groups().unwrap_or(Vec::new()) ); + if input.is_empty { + return vec![SubscribeEvent::UnsubscribeAll]; + } + executor(SubscriptionParams { channels: &input.channels(), channel_groups: &input.channel_groups(), @@ -36,7 +41,10 @@ pub(crate) async fn execute( .map_ok_or_else( |error| { log::error!("Receive error: {:?}", error); - vec![SubscribeEvent::ReceiveFailure { reason: error }] + + (!matches!(error, PubNubError::EffectCanceled)) + .then(|| vec![SubscribeEvent::ReceiveFailure { reason: error }]) + .unwrap_or(vec![]) }, |subscribe_result| { vec![SubscribeEvent::ReceiveSuccess { diff --git a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs index f80f536d..eb472902 100644 --- a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs +++ b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs @@ -34,6 +34,10 @@ pub(crate) async fn execute( input.channel_groups().unwrap_or(Vec::new()) ); + if input.is_empty { + return vec![SubscribeEvent::UnsubscribeAll]; + } + executor(SubscriptionParams { channels: &input.channels(), channel_groups: &input.channel_groups(), diff --git a/src/dx/subscribe/event_engine/types.rs b/src/dx/subscribe/event_engine/types.rs index a7ae0ea2..070edc32 100644 --- a/src/dx/subscribe/event_engine/types.rs +++ b/src/dx/subscribe/event_engine/types.rs @@ -137,6 +137,12 @@ impl Add for SubscribeInput { } } +impl Default for SubscribeInput { + fn default() -> Self { + SubscribeInput::new(&None, &None) + } +} + impl AddAssign for SubscribeInput { fn add_assign(&mut self, rhs: Self) { let channel_groups = self.join_sets(&self.channel_groups, &rhs.channel_groups); diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs index c03b63b9..28ee9b71 100644 --- a/src/dx/subscribe/mod.rs +++ b/src/dx/subscribe/mod.rs @@ -47,7 +47,8 @@ pub(crate) mod subscription_manager; #[cfg(feature = "std")] #[doc(inline)] use event_engine::{ - types::SubscriptionParams, SubscribeEffectHandler, SubscribeEventEngine, SubscribeState, + types::SubscriptionParams, SubscribeEffectHandler, SubscribeEventEngine, SubscribeInput, + SubscribeState, }; #[cfg(feature = "std")] @@ -100,7 +101,6 @@ where /// /// Instance of [`SubscriptionBuilder`] returned. /// [`PubNubClient`]: crate::PubNubClient - #[cfg(all(feature = "tokio", feature = "serde"))] pub fn subscribe(&self) -> SubscriptionBuilder { self.configure_subscribe(); @@ -110,21 +110,166 @@ where } } + /// Stop receiving real-time updates. + /// + /// Stop receiving real-time updates for previously subscribed channels and + /// groups by temporarily disconnecting from the [`PubNub`] network. + /// + /// ```no_run + /// use futures::StreamExt; + /// use pubnub::dx::subscribe::{SubscribeStreamEvent, Update}; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box> { + /// # use pubnub::{Keyset, PubNubClientBuilder}; + /// # + /// # let client = PubNubClientBuilder::with_reqwest_transport() + /// # .with_keyset(Keyset { + /// # subscribe_key: "demo", + /// # publish_key: Some("demo"), + /// # secret_key: None, + /// # }) + /// # .with_user_id("user_id") + /// # .build()?; + /// # let stream = // SubscriptionStream + /// # client + /// # .subscribe() + /// # .channels(["hello".into(), "world".into()].to_vec()) + /// # .execute()? + /// # .stream(); + /// client.disconnect(); + /// # Ok(()) + /// # } + /// ``` + /// + /// [`PubNub`]: https://www.pubnub.com + pub fn disconnect(&self) { + let mut input: Option = None; + + if let Some(manager) = self.subscription.read().as_ref() { + let current_input = manager.current_input(); + input = (!current_input.is_empty).then_some(current_input); + manager.disconnect() + } + + #[cfg(feature = "presence")] + { + let Some(input) = input else { + return; + }; + + if self.config.heartbeat_interval.is_none() { + let mut request = self.leave(); + if let Some(channels) = input.channels() { + request = request.channels(channels); + } + if let Some(channel_groups) = input.channel_groups() { + request = request.channel_groups(channel_groups); + } + + self.runtime.spawn(async { + let _ = request.execute().await; + }) + } else if let Some(presence) = self.presence.clone().read().as_ref() { + presence.disconnect(); + } + } + } + + /// Resume real-time updates receiving. + /// + /// Restore real-time updates receive from previously subscribed channels + /// and groups by restoring connection to the [`PubNub`] network. + /// + /// ```no_run + /// use futures::StreamExt; + /// use pubnub::dx::subscribe::{SubscribeStreamEvent, Update}; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box> { + /// # use pubnub::{Keyset, PubNubClientBuilder}; + /// # + /// # let client = PubNubClientBuilder::with_reqwest_transport() + /// # .with_keyset(Keyset { + /// # subscribe_key: "demo", + /// # publish_key: Some("demo"), + /// # secret_key: None, + /// # }) + /// # .with_user_id("user_id") + /// # .build()?; + /// # let stream = // SubscriptionStream + /// # client + /// # .subscribe() + /// # .channels(["hello".into(), "world".into()].to_vec()) + /// # .execute()? + /// # .stream(); + /// # // ..... + /// # client.disconnect(); + /// client.reconnect(); + /// # Ok(()) + /// # } + /// ``` + /// + /// [`PubNub`]: https://www.pubnub.com + pub fn reconnect(&self) { + let mut input: Option = None; + + if let Some(manager) = self.subscription.read().as_ref() { + let current_input = manager.current_input(); + input = (!current_input.is_empty).then_some(current_input); + manager.reconnect() + } + + #[cfg(feature = "presence")] + { + let Some(input) = input else { + return; + }; + + if self.config.heartbeat_interval.is_none() { + let mut request = self.heartbeat(); + if let Some(channels) = input.channels() { + request = request.channels(channels); + } + if let Some(channel_groups) = input.channel_groups() { + request = request.channel_groups(channel_groups); + } + + self.runtime.spawn(async { + let _ = request.execute().await; + }) + } else if let Some(presence) = self.presence.clone().read().as_ref() { + presence.reconnect(); + } + } + } + pub(crate) fn configure_subscribe(&self) -> Arc>> { { // Initialize subscription module when it will be first required. let mut slot = self.subscription.write(); if slot.is_none() { - *slot = Some(SubscriptionManager::new(self.subscribe_event_engine())); - // *subscription_slot = - // Some(self.clone().subscription_manager(runtime)); + #[cfg(feature = "presence")] + self.configure_presence(); + + let heartbeat_self = self.clone(); + let leave_self = self.clone(); + *slot = Some(SubscriptionManager::new( + self.subscribe_event_engine(), + Arc::new(move |channels, groups| { + Self::subscribe_heartbeat_call(heartbeat_self.clone(), channels, groups); + }), + Arc::new(move |channels, groups| { + Self::subscribe_leave_call(leave_self.clone(), channels, groups); + }), + )); } } self.subscription.clone() } - pub(crate) fn subscribe_event_engine(&self) -> Arc { + fn subscribe_event_engine(&self) -> Arc { let channel_bound = 10; // TODO: Think about this value let emit_messages_client = self.clone(); let emit_status_client = self.clone(); @@ -168,7 +313,7 @@ where ) } - pub(crate) fn subscribe_call( + fn subscribe_call( client: Self, params: SubscriptionParams, delay: Arc, @@ -195,6 +340,69 @@ where .boxed() } + /// Subscription event engine presence `join` announcement. + /// + /// The heartbeat call method provides few different flows based on the + /// presence event engine state: + /// * can operate - call `join` announcement + /// * can't operate (heartbeat interval not set) - make direct `heartbeat` + /// call. + fn subscribe_heartbeat_call( + client: Self, + channels: Option>, + channel_groups: Option>, + ) { + #[cfg(feature = "presence")] + { + if client.config.heartbeat_interval.is_none() { + let mut request = client.heartbeat(); + if let Some(channels) = channels { + request = request.channels(channels); + } + if let Some(channel_groups) = channel_groups { + request = request.channel_groups(channel_groups); + } + + client.runtime.spawn(async { + let _ = request.execute().await; + }) + } else if let Some(presence) = client.presence.clone().read().as_ref() { + presence.announce_join(channels, channel_groups); + } + } + } + + /// Subscription event engine presence `leave` announcement. + /// + /// The leave call method provides few different flows based on the + /// presence event engine state: + /// * can operate - call `leave` announcement + /// * can't operate (heartbeat interval not set) - make direct `leave` call. + fn subscribe_leave_call( + client: Self, + channels: Option>, + channel_groups: Option>, + ) { + #[cfg(feature = "presence")] + { + if client.config.heartbeat_interval.is_none() { + let mut request = client.leave(); + if let Some(channels) = channels { + request = request.channels(channels); + } + if let Some(channel_groups) = channel_groups { + request = request.channel_groups(channel_groups); + } + + client.runtime.spawn(async { + let _ = request.execute().await; + }) + } else if let Some(presence) = client.presence.clone().read().as_ref() { + presence.announce_left(channels, channel_groups); + } + } + } + fn emit_status(client: Self, status: &SubscribeStatus) { if let Some(manager) = client.subscription.read().as_ref() { manager.notify_new_status(status) diff --git a/src/dx/subscribe/subscription_manager.rs b/src/dx/subscribe/subscription_manager.rs index 6de325ee..ac3015bb 100644 --- a/src/dx/subscribe/subscription_manager.rs +++ b/src/dx/subscribe/subscription_manager.rs @@ -12,9 +12,16 @@ use crate::{ }, lib::{ alloc::{sync::Arc, vec::Vec}, - core::ops::{Deref, DerefMut}, + core::{ + fmt::Debug, + ops::{Deref, DerefMut}, + }, }, }; +use std::fmt::Formatter; + +pub(in crate::dx::subscribe) type PresenceCall = + dyn Fn(Option>, Option>) + Send + Sync; /// Active subscriptions manager. /// @@ -25,18 +32,22 @@ use crate::{ /// [`subscription`]: crate::Subscription /// [`PubNubClient`]: crate::PubNubClient #[derive(Debug)] -#[allow(dead_code)] pub(crate) struct SubscriptionManager { pub(crate) inner: Arc, } -#[allow(dead_code)] impl SubscriptionManager { - pub fn new(event_engine: Arc) -> Self { + pub fn new( + event_engine: Arc, + heartbeat_call: Arc, + leave_call: Arc, + ) -> Self { Self { inner: Arc::new(SubscriptionManagerRef { event_engine, subscribers: Default::default(), + _heartbeat_call: heartbeat_call, + _leave_call: leave_call, }), } } @@ -72,8 +83,6 @@ impl Clone for SubscriptionManager { /// /// [`subscription`]: crate::Subscription /// [`PubNubClient`]: crate::PubNubClient -#[derive(Debug)] -#[allow(dead_code)] pub(crate) struct SubscriptionManagerRef { /// Subscription event engine. /// @@ -84,6 +93,16 @@ pub(crate) struct SubscriptionManagerRef { /// /// List of subscribers which will receive real-time updates. subscribers: Vec, + + /// Presence `join` announcement. + /// + /// Announces `user_id` presence on specified channels and groups. + _heartbeat_call: Arc, + + /// Presence `leave` announcement. + /// + /// Announces `user_id` `leave` from specified channels and groups. + _leave_call: Arc, } impl SubscriptionManagerRef { @@ -106,7 +125,7 @@ impl SubscriptionManagerRef { if let Some(cursor) = cursor { self.restore_subscription(cursor); } else { - self.change_subscription(); + self.change_subscription(None); } } @@ -119,17 +138,45 @@ impl SubscriptionManagerRef { self.subscribers.swap_remove(position); } - self.change_subscription(); + self.change_subscription(Some(&subscription.input)); } - fn change_subscription(&self) { - let inputs = self.subscribers.iter().fold( - SubscribeInput::new(&None, &None), - |mut input, subscription| { - input += subscription.input.clone(); - input - }, - ); + // TODO: why call it on drop fails tests? + #[allow(dead_code)] + pub fn unregister_all(&mut self) { + let inputs = self.current_input(); + + self.subscribers + .iter_mut() + .for_each(|subscription| subscription.invalidate()); + self.subscribers.clear(); + self.change_subscription(Some(&inputs)); + } + + pub fn disconnect(&self) { + self.event_engine.process(&SubscribeEvent::Disconnect); + } + + pub fn reconnect(&self) { + self.event_engine.process(&SubscribeEvent::Reconnect); + } + + fn change_subscription(&self, _removed: Option<&SubscribeInput>) { + let inputs = self.current_input(); + + // TODO: Uncomment after contract test server fix. + // #[cfg(feature = "presence")] + // { + // (!inputs.is_empty) + // .then(|| self.heartbeat_call.as_ref()(inputs.channels(), + // inputs.channel_groups())); + // + // if let Some(removed) = removed { + // (!removed.is_empty).then(|| { + // self.leave_call.as_ref()(removed.channels(), + // removed.channel_groups()) }); + // } + // } self.event_engine .process(&SubscribeEvent::SubscriptionChanged { @@ -139,13 +186,13 @@ impl SubscriptionManagerRef { } fn restore_subscription(&self, cursor: u64) { - let inputs = self.subscribers.iter().fold( - SubscribeInput::new(&None, &None), - |mut input, subscription| { - input += subscription.input.clone(); - input - }, - ); + let inputs = self.current_input(); + + // TODO: Uncomment after contract test server fix. + // #[cfg(feature = "presence")] + // if !inputs.is_empty { + // self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups()); + // } self.event_engine .process(&SubscribeEvent::SubscriptionRestored { @@ -157,6 +204,26 @@ impl SubscriptionManagerRef { }, }); } + + pub(crate) fn current_input(&self) -> SubscribeInput { + self.subscribers.iter().fold( + SubscribeInput::new(&None, &None), + |mut input, subscription| { + input += subscription.input.clone(); + input + }, + ) + } +} + +impl Debug for SubscriptionManagerRef { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "SubscriptionManagerRef {{\n\tevent_engine: {:?}\n\tsubscribers: {:?}\n}}", + self.event_engine, self.subscribers + ) + } } #[cfg(test)] @@ -204,8 +271,16 @@ mod should { #[tokio::test] async fn register_subscription() { - let mut manager = SubscriptionManager::new(event_engine()); - let dummy_manager = SubscriptionManager::new(event_engine()); + let mut manager = SubscriptionManager::new( + event_engine(), + Arc::new(|channels, _| { + assert!(channels.is_some()); + assert_eq!(channels.unwrap().len(), 1); + }), + Arc::new(|_, _| {}), + ); + let dummy_manager = + SubscriptionManager::new(event_engine(), Arc::new(|_, _| {}), Arc::new(|_, _| {})); let subscription = SubscriptionBuilder { subscription: Some(Arc::new(RwLock::new(Some(dummy_manager)))), @@ -222,8 +297,16 @@ mod should { #[tokio::test] async fn unregister_subscription() { - let mut manager = SubscriptionManager::new(event_engine()); - let dummy_manager = SubscriptionManager::new(event_engine()); + let mut manager = SubscriptionManager::new( + event_engine(), + Arc::new(|_, _| {}), + Arc::new(|channels, _| { + assert!(channels.is_some()); + assert_eq!(channels.unwrap().len(), 1); + }), + ); + let dummy_manager = + SubscriptionManager::new(event_engine(), Arc::new(|_, _| {}), Arc::new(|_, _| {})); let subscription = SubscriptionBuilder { subscription: Some(Arc::new(RwLock::new(Some(dummy_manager)))), @@ -241,8 +324,10 @@ mod should { #[tokio::test] async fn notify_subscription_about_statuses() { - let mut manager = SubscriptionManager::new(event_engine()); - let dummy_manager = SubscriptionManager::new(event_engine()); + let mut manager = + SubscriptionManager::new(event_engine(), Arc::new(|_, _| {}), Arc::new(|_, _| {})); + let dummy_manager = + SubscriptionManager::new(event_engine(), Arc::new(|_, _| {}), Arc::new(|_, _| {})); let subscription = SubscriptionBuilder { subscription: Some(Arc::new(RwLock::new(Some(dummy_manager)))), @@ -270,8 +355,10 @@ mod should { #[tokio::test] async fn notify_subscription_about_updates() { - let mut manager = SubscriptionManager::new(event_engine()); - let dummy_manager = SubscriptionManager::new(event_engine()); + let mut manager = + SubscriptionManager::new(event_engine(), Arc::new(|_, _| {}), Arc::new(|_, _| {})); + let dummy_manager = + SubscriptionManager::new(event_engine(), Arc::new(|_, _| {}), Arc::new(|_, _| {})); let subscription = SubscriptionBuilder { subscription: Some(Arc::new(RwLock::new(Some(dummy_manager)))),