From c85f301b7359818aed7b87d0c511ebdb55df9711 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Tue, 29 Aug 2023 17:20:03 +0300 Subject: [PATCH] refactor(presence): rename `set_state` to `set_presence_state` fix(subscribe): fix issue where presence events has been filtered out --- src/core/event_engine/transition.rs | 2 +- src/dx/presence/builders/mod.rs | 4 +- .../{set_state.rs => set_presence_state.rs} | 0 src/dx/presence/mod.rs | 4 +- src/dx/subscribe/builders/subscription.rs | 4 +- .../event_engine/effects/emit_messages.rs | 2 +- src/dx/subscribe/event_engine/types.rs | 6 +- src/dx/subscribe/mod.rs | 99 +++++++++--- src/dx/subscribe/result.rs | 30 ++-- src/dx/subscribe/subscription_manager.rs | 2 +- src/dx/subscribe/types.rs | 144 ++++++------------ tests/subscribe/subscribe_steps.rs | 2 - 12 files changed, 149 insertions(+), 150 deletions(-) rename src/dx/presence/builders/{set_state.rs => set_presence_state.rs} (100%) diff --git a/src/core/event_engine/transition.rs b/src/core/event_engine/transition.rs index e1ca80ca..6fc4050c 100644 --- a/src/core/event_engine/transition.rs +++ b/src/core/event_engine/transition.rs @@ -7,7 +7,7 @@ use crate::{ /// /// State transition with information about target state and list of effect /// invocations. -#[allow(dead_code)] + pub(crate) struct Transition where S: State, diff --git a/src/dx/presence/builders/mod.rs b/src/dx/presence/builders/mod.rs index 6951d9c1..4c3a3e23 100644 --- a/src/dx/presence/builders/mod.rs +++ b/src/dx/presence/builders/mod.rs @@ -10,8 +10,8 @@ pub(crate) use heartbeat::HeartbeatRequestBuilder; pub(crate) mod heartbeat; #[doc(inline)] -pub use set_state::{SetStateRequest, SetStateRequestBuilder}; -pub mod set_state; +pub use set_presence_state::{SetStateRequest, SetStateRequestBuilder}; +pub mod set_presence_state; #[doc(inline)] pub(crate) use leave::LeaveRequestBuilder; diff --git a/src/dx/presence/builders/set_state.rs b/src/dx/presence/builders/set_presence_state.rs similarity index 100% rename from src/dx/presence/builders/set_state.rs rename to src/dx/presence/builders/set_presence_state.rs diff --git a/src/dx/presence/mod.rs b/src/dx/presence/mod.rs index 91a9c051..794b22af 100644 --- a/src/dx/presence/mod.rs +++ b/src/dx/presence/mod.rs @@ -135,7 +135,7 @@ impl PubNubClientInstance { /// # .with_user_id("uuid") /// # .build()?; /// pubnub - /// .set_state(HashMap::::from( + /// .set_presence_state(HashMap::::from( /// [("is_admin".into(), false)] /// )) /// .channels(["lobby".into(), "announce".into()]) @@ -145,7 +145,7 @@ impl PubNubClientInstance { /// # Ok(()) /// # } /// ``` - pub fn set_state(&self, state: S) -> SetStateRequestBuilder + pub fn set_presence_state(&self, state: S) -> SetStateRequestBuilder where S: Serialize + Send + Sync + 'static, { diff --git a/src/dx/subscribe/builders/subscription.rs b/src/dx/subscribe/builders/subscription.rs index 61295cce..c48f43e7 100644 --- a/src/dx/subscribe/builders/subscription.rs +++ b/src/dx/subscribe/builders/subscription.rs @@ -502,8 +502,8 @@ impl Subscription { } fn subscribed_for_update(&self, update: &Update) -> bool { - self.input.contains_channel(&update.channel()) - || self.input.contains_channel_group(&update.channel_group()) + let subscription = &update.subscription(); + self.input.contains_channel(subscription) || self.input.contains_channel_group(subscription) } } diff --git a/src/dx/subscribe/event_engine/effects/emit_messages.rs b/src/dx/subscribe/event_engine/effects/emit_messages.rs index 42c39f02..8ab16b8b 100644 --- a/src/dx/subscribe/event_engine/effects/emit_messages.rs +++ b/src/dx/subscribe/event_engine/effects/emit_messages.rs @@ -29,7 +29,7 @@ mod should { sender: Some("test-user".into()), timestamp: 1234567890, channel: "test".to_string(), - subscription: Some("test-group".to_string()), + subscription: "test-group".to_string(), data: vec![], r#type: None, space_id: None, diff --git a/src/dx/subscribe/event_engine/types.rs b/src/dx/subscribe/event_engine/types.rs index cad3a70a..a7ae0ea2 100644 --- a/src/dx/subscribe/event_engine/types.rs +++ b/src/dx/subscribe/event_engine/types.rs @@ -86,11 +86,7 @@ impl SubscribeInput { .map(|ch| ch.into_iter().collect()) } - pub fn contains_channel_group(&self, channel_group: &Option) -> bool { - let Some(channel_group) = channel_group else { - return false; - }; - + pub fn contains_channel_group(&self, channel_group: &String) -> bool { self.channel_groups .as_ref() .map_or(false, |channel_groups| { diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs index 64120c40..c03b63b9 100644 --- a/src/dx/subscribe/mod.rs +++ b/src/dx/subscribe/mod.rs @@ -286,6 +286,14 @@ mod should { Keyset, PubNubClientBuilder, PubNubGenericClient, }; + #[derive(serde::Deserialize)] + struct UserStateData { + #[serde(rename = "admin")] + pub is_admin: bool, + #[serde(rename = "displayName")] + pub display_name: String, + } + struct MockTransport; #[async_trait::async_trait] @@ -312,26 +320,56 @@ mod should { fn generate_body() -> Option> { Some( r#"{ - "t": { - "t": "15628652479932717", - "r": 4 - }, - "m": [ - { "a": "1", - "f": 514, - "i": "pn-0ca50551-4bc8-446e-8829-c70b704545fd", - "s": 1, - "p": { + "t": { + "t": "15628652479932717", + "r": 4 + }, + "m": [ + { + "a": "1", + "f": 514, + "i": "pn-0ca50551-4bc8-446e-8829-c70b704545fd", + "s": 1, + "p": { "t": "15628652479933927", "r": 4 + }, + "k": "demo", + "c": "my-channel", + "d": "my message", + "b": "my-channel" + }, + { + "a": "5", + "f": 0, + "p": { + "r": 12, + "t": "15800701771129796" + }, + "k": "demo", + "u": { + "pn_action": "state-change", + "pn_channel": "my-channel", + "pn_ispresence": 1, + "pn_occupancy": 1, + "pn_timestamp": 1580070177, + "pn_uuid": "pn-0ca50551-4bc8-446e-8829-c70b704545fd" + }, + "c": "my-channel-pnpres", + "d": { + "action": "state-change", + "data": { + "admin": true, + "displayName": "ChannelAdmin" }, - "k": "demo", - "c": "my-channel", - "d": "my message", - "b": "my-channel" - } - ] - }"# + "occupancy": 1, + "timestamp": 1580070177, + "uuid": "pn-0ca50551-4bc8-446e-8829-c70b704545fd" + }, + "b": "my-channel-pnpres" + } + ] + }"# .into(), ) } @@ -357,17 +395,42 @@ mod should { async fn subscribe() { let subscription = client() .subscribe() - .channels(["world".into()].to_vec()) + .channels(["my-channel".into(), "my-channel-pnpres".into()].to_vec()) .execute() .unwrap(); use futures::StreamExt; let status = subscription.stream().next().await.unwrap(); + let message = subscription.stream().next().await.unwrap(); + let presence = subscription.stream().next().await.unwrap(); assert!(matches!( status, SubscribeStreamEvent::Status(SubscribeStatus::Connected) )); + assert!(matches!( + message, + SubscribeStreamEvent::Update(Update::Message(_)) + )); + assert!(matches!( + presence, + SubscribeStreamEvent::Update(Update::Presence(_)) + )); + if let SubscribeStreamEvent::Update(Update::Presence(Presence::StateChange { + timestamp: _, + channel: _, + subscription: _, + uuid: _, + data, + })) = presence + { + let user_data: UserStateData = serde_json::from_value(data) + .expect("Should successfully deserialize user state object."); + assert!(user_data.is_admin); + assert_eq!(user_data.display_name, "ChannelAdmin"); + } else { + panic!("Expected to receive presence update.") + } } #[tokio::test] diff --git a/src/dx/subscribe/result.rs b/src/dx/subscribe/result.rs index 8fdc3292..24546434 100644 --- a/src/dx/subscribe/result.rs +++ b/src/dx/subscribe/result.rs @@ -303,7 +303,12 @@ pub enum EnvelopePayload { occupancy: Option, /// The user's state associated with the channel has been updated. - data: Option, + #[cfg(feature = "serde")] + data: serde_json::Value, + + /// The user's state associated with the channel has been updated. + #[cfg(not(feature = "serde"))] + data: Vec, /// The list of unique user identifiers that `joined` the channel since /// the last interval presence update. @@ -539,25 +544,14 @@ impl Envelope { #[cfg(feature = "std")] impl Update { - /// Name of channel. - /// - /// Name of channel at which update has been received. - pub(crate) fn channel(&self) -> String { - match self { - Update::Presence(presence) => presence.channel(), - Update::Object(object) => object.channel(), - Update::MessageAction(action) => action.channel.to_string(), - Update::File(file) => file.channel.to_string(), - Update::Message(message) | Update::Signal(message) => message.channel.to_string(), - } - } - /// Name of channel. + /// Name of subscription. /// - /// Name of channel at which update has been received. - pub(crate) fn channel_group(&self) -> Option { + /// Name of channel or channel group on which client subscribed and through + /// which real-time update has been delivered. + pub(crate) fn subscription(&self) -> String { match self { - Update::Presence(presence) => presence.channel_group(), - Update::Object(object) => object.channel_group(), + Update::Presence(presence) => presence.subscription(), + Update::Object(object) => object.subscription(), Update::MessageAction(action) => action.subscription.clone(), Update::File(file) => file.subscription.clone(), Update::Message(message) | Update::Signal(message) => message.subscription.clone(), diff --git a/src/dx/subscribe/subscription_manager.rs b/src/dx/subscribe/subscription_manager.rs index 6a5b2dcf..6de325ee 100644 --- a/src/dx/subscribe/subscription_manager.rs +++ b/src/dx/subscribe/subscription_manager.rs @@ -175,7 +175,6 @@ mod should { }; use spin::RwLock; - #[allow(dead_code)] fn event_engine() -> Arc { let (cancel_tx, _) = async_channel::bounded(1); @@ -286,6 +285,7 @@ mod should { manager.notify_new_messages(vec![Update::Message(Message { channel: "test".into(), + subscription: "test".into(), ..Default::default() })]); diff --git a/src/dx/subscribe/types.rs b/src/dx/subscribe/types.rs index 3d5e2e07..09876ad3 100644 --- a/src/dx/subscribe/types.rs +++ b/src/dx/subscribe/types.rs @@ -5,6 +5,7 @@ use crate::{ dx::subscribe::result::{Envelope, EnvelopePayload, ObjectDataBody, Update}, lib::{ alloc::{ + borrow::ToOwned, boxed::Box, string::{String, ToString}, sync::Arc, @@ -130,10 +131,7 @@ pub enum Presence { /// Actual name of subscription through which `user joined` update has /// been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, /// Current channel occupancy after user joined. occupancy: usize, @@ -151,10 +149,7 @@ pub enum Presence { /// Actual name of subscription through which `user left` update has /// been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, /// Current channel occupancy after user left. occupancy: usize, @@ -175,10 +170,7 @@ pub enum Presence { /// Actual name of subscription through which `user timeout` update has /// been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, /// Current channel occupancy after user timeout. occupancy: usize, @@ -200,10 +192,7 @@ pub enum Presence { /// Actual name of subscription through which `interval` update has been /// delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, /// Current channel occupancy. occupancy: usize, @@ -234,16 +223,18 @@ pub enum Presence { /// Actual name of subscription through which `state changed` update has /// been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, /// Unique identification of the user for which state has been changed. uuid: String, /// The user's state associated with the channel has been updated. - data: Option, + #[cfg(feature = "serde")] + data: serde_json::Value, + + /// The user's state associated with the channel has been updated. + #[cfg(not(feature = "serde"))] + data: Vec, }, } @@ -289,10 +280,7 @@ pub enum Object { /// Actual name of subscription through which `channel object` update /// has been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, }, /// `UUID` object update. @@ -336,10 +324,7 @@ pub enum Object { /// Actual name of subscription through which `uuid object` update has /// been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, }, /// `Membership` object update. @@ -372,10 +357,7 @@ pub enum Object { /// Actual name of subscription through which `membership` update has /// been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - subscription: Option, + subscription: String, }, } @@ -394,10 +376,7 @@ pub struct Message { pub channel: String, /// Actual name of subscription through which update has been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - pub subscription: Option, + pub subscription: String, /// Data published along with message / signal. pub data: Vec, @@ -440,10 +419,7 @@ pub struct MessageAction { pub channel: String, /// Actual name of subscription through which update has been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - pub subscription: Option, + pub subscription: String, /// Timetoken of message for which action has been added / removed. pub message_timetoken: String, @@ -475,10 +451,7 @@ pub struct File { pub channel: String, /// Actual name of subscription through which update has been delivered. - /// - /// Will be always `None` except case when update has been delivered - /// not from the channel but from the group. - pub subscription: Option, + pub subscription: String, /// Message which has been associated with uploaded file. message: String, @@ -573,58 +546,28 @@ impl core::fmt::Display for SubscribeStatus { #[cfg(feature = "std")] impl Presence { - /// Presence update channel. + /// Name of subscription. /// - /// Name of channel at which presence update has been triggered. - pub(crate) fn channel(&self) -> String { - match self { - Presence::Join { channel, .. } - | Presence::Leave { channel, .. } - | Presence::Timeout { channel, .. } - | Presence::Interval { channel, .. } - | Presence::StateChange { channel, .. } => channel - .split('-') - .last() - .map(|name| name.to_string()) - .unwrap_or(channel.to_string()), - } - } - - /// Presence update channel group. - /// - /// Name of channel group through which presence update has been triggered. - pub(crate) fn channel_group(&self) -> Option { + /// Name of channel or channel group on which client subscribed and through + /// which presence update has been delivered. + pub(crate) fn subscription(&self) -> String { match self { Presence::Join { subscription, .. } | Presence::Leave { subscription, .. } | Presence::Timeout { subscription, .. } | Presence::Interval { subscription, .. } - | Presence::StateChange { subscription, .. } => subscription.clone().map(|s| { - s.split('-') - .last() - .map(|name| name.to_string()) - .unwrap_or(s) - }), + | Presence::StateChange { subscription, .. } => subscription.clone(), } } } #[cfg(feature = "std")] impl Object { - /// Object channel name. + /// Name of subscription. /// - /// Name of channel (object id) at which object update has been triggered. - pub(crate) fn channel(&self) -> String { - match self { - Object::Channel { id, .. } | Object::Uuid { id, .. } => id.to_string(), - Object::Membership { uuid, .. } => uuid.to_string(), - } - } - - /// Object channel group name. - /// - /// Name of channel group through which object update has been triggered. - pub(crate) fn channel_group(&self) -> Option { + /// Name of channel or channel group on which client subscribed and through + /// which which object update has been triggered. + pub(crate) fn subscription(&self) -> String { match self { Object::Channel { subscription, .. } | Object::Uuid { subscription, .. } @@ -692,6 +635,7 @@ impl TryFrom for Presence { let action = action.unwrap_or("interval".to_string()); let subscription = resolve_subscription_value(value.subscription, &value.channel); + let channel = value.channel.replace("-pnpres", ""); match action.as_str() { "join" => Ok(Self::Join { @@ -699,7 +643,7 @@ impl TryFrom for Presence { // `join` event always has `uuid` and unwrap_or default // value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel.clone(), + channel, subscription, occupancy: occupancy.unwrap_or(0), }), @@ -708,7 +652,7 @@ impl TryFrom for Presence { // `leave` event always has `uuid` and unwrap_or default // value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel.clone(), + channel, subscription, occupancy: occupancy.unwrap_or(0), }), @@ -717,13 +661,13 @@ impl TryFrom for Presence { // `leave` event always has `uuid` and unwrap_or default // value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel.clone(), + channel, subscription, occupancy: occupancy.unwrap_or(0), }), "interval" => Ok(Self::Interval { timestamp, - channel: value.channel.clone(), + channel, subscription, occupancy: occupancy.unwrap_or(0), join, @@ -735,7 +679,7 @@ impl TryFrom for Presence { // `state-change` event always has `uuid` and unwrap_or // default value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel.clone(), + channel, subscription, data, }), @@ -964,8 +908,8 @@ impl TryFrom for File { } } -fn resolve_subscription_value(subscription: Option, channel: &str) -> Option { - subscription.and_then(|s| s.ne(channel).then_some(s)) +fn resolve_subscription_value(subscription: Option, channel: &str) -> String { + subscription.unwrap_or(channel.to_owned()) } // TODO: add tests for complicated froms. @@ -974,13 +918,17 @@ mod should { use super::*; use test_case::test_case; - #[test_case(None, "channel" => None; "no subscription")] - #[test_case(Some("channel".into()), "channel" => None; "same subscription and channel")] - #[test_case(Some("channel".into()), "channel2" => Some("channel".into()); "different subscription and channel")] - fn resolve_subscription_field_value( - subscription: Option, - channel: &str, - ) -> Option { + #[test_case( + None, + "channel" => "channel".to_string(); + "no subscription" + )] + #[test_case( + Some("channel".into()), + "channel2" => "channel".to_string(); + "different subscription and channel" + )] + fn resolve_subscription_field_value(subscription: Option, channel: &str) -> String { resolve_subscription_value(subscription, channel) } } diff --git a/tests/subscribe/subscribe_steps.rs b/tests/subscribe/subscribe_steps.rs index 1e19a638..a473ba22 100644 --- a/tests/subscribe/subscribe_steps.rs +++ b/tests/subscribe/subscribe_steps.rs @@ -117,8 +117,6 @@ async fn receive_an_error_subscribe_retry(world: &mut PubNubWorld) { _ = subscription.next().fuse() => panic!("Message update from server") } - println!("~~~> {:?}", &world.retry_policy); - let expected_retry_count: usize = usize::from(match &world.retry_policy.clone().unwrap() { RequestRetryPolicy::Linear { max_retry, .. } | RequestRetryPolicy::Exponential { max_retry, .. } => *max_retry,