Skip to content

Commit

Permalink
refactor(presence): rename set_state to set_presence_state
Browse files Browse the repository at this point in the history
fix(subscribe): fix issue where presence events has been filtered out
  • Loading branch information
parfeon committed Aug 29, 2023
1 parent d05f24a commit c85f301
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 150 deletions.
2 changes: 1 addition & 1 deletion src/core/event_engine/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
///
/// State transition with information about target state and list of effect
/// invocations.
#[allow(dead_code)]
pub(crate) struct Transition<S, I>
where
S: State,
Expand Down
4 changes: 2 additions & 2 deletions src/dx/presence/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub(crate) use heartbeat::HeartbeatRequestBuilder;
pub(crate) mod heartbeat;

#[doc(inline)]
pub use set_state::{SetStateRequest, SetStateRequestBuilder};
pub mod set_state;
pub use set_presence_state::{SetStateRequest, SetStateRequestBuilder};
pub mod set_presence_state;

#[doc(inline)]
pub(crate) use leave::LeaveRequestBuilder;
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions src/dx/presence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<T, D> PubNubClientInstance<T, D> {
/// # .with_user_id("uuid")
/// # .build()?;
/// pubnub
/// .set_state(HashMap::<String, bool>::from(
/// .set_presence_state(HashMap::<String, bool>::from(
/// [("is_admin".into(), false)]
/// ))
/// .channels(["lobby".into(), "announce".into()])
Expand All @@ -145,7 +145,7 @@ impl<T, D> PubNubClientInstance<T, D> {
/// # Ok(())
/// # }
/// ```
pub fn set_state<S>(&self, state: S) -> SetStateRequestBuilder<T, D>
pub fn set_presence_state<S>(&self, state: S) -> SetStateRequestBuilder<T, D>
where
S: Serialize + Send + Sync + 'static,
{
Expand Down
4 changes: 2 additions & 2 deletions src/dx/subscribe/builders/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ impl Subscription {
}

fn subscribed_for_update(&self, update: &Update) -> bool {
self.input.contains_channel(&update.channel())
|| self.input.contains_channel_group(&update.channel_group())
let subscription = &update.subscription();
self.input.contains_channel(subscription) || self.input.contains_channel_group(subscription)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/event_engine/effects/emit_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod should {
sender: Some("test-user".into()),
timestamp: 1234567890,
channel: "test".to_string(),
subscription: Some("test-group".to_string()),
subscription: "test-group".to_string(),
data: vec![],
r#type: None,
space_id: None,
Expand Down
6 changes: 1 addition & 5 deletions src/dx/subscribe/event_engine/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ impl SubscribeInput {
.map(|ch| ch.into_iter().collect())
}

pub fn contains_channel_group(&self, channel_group: &Option<String>) -> bool {
let Some(channel_group) = channel_group else {
return false;
};

pub fn contains_channel_group(&self, channel_group: &String) -> bool {
self.channel_groups
.as_ref()
.map_or(false, |channel_groups| {
Expand Down
99 changes: 81 additions & 18 deletions src/dx/subscribe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ mod should {
Keyset, PubNubClientBuilder, PubNubGenericClient,
};

#[derive(serde::Deserialize)]
struct UserStateData {
#[serde(rename = "admin")]
pub is_admin: bool,
#[serde(rename = "displayName")]
pub display_name: String,
}

struct MockTransport;

#[async_trait::async_trait]
Expand All @@ -312,26 +320,56 @@ mod should {
fn generate_body() -> Option<Vec<u8>> {
Some(
r#"{
"t": {
"t": "15628652479932717",
"r": 4
},
"m": [
{ "a": "1",
"f": 514,
"i": "pn-0ca50551-4bc8-446e-8829-c70b704545fd",
"s": 1,
"p": {
"t": {
"t": "15628652479932717",
"r": 4
},
"m": [
{
"a": "1",
"f": 514,
"i": "pn-0ca50551-4bc8-446e-8829-c70b704545fd",
"s": 1,
"p": {
"t": "15628652479933927",
"r": 4
},
"k": "demo",
"c": "my-channel",
"d": "my message",
"b": "my-channel"
},
{
"a": "5",
"f": 0,
"p": {
"r": 12,
"t": "15800701771129796"
},
"k": "demo",
"u": {
"pn_action": "state-change",
"pn_channel": "my-channel",
"pn_ispresence": 1,
"pn_occupancy": 1,
"pn_timestamp": 1580070177,
"pn_uuid": "pn-0ca50551-4bc8-446e-8829-c70b704545fd"
},
"c": "my-channel-pnpres",
"d": {
"action": "state-change",
"data": {
"admin": true,
"displayName": "ChannelAdmin"
},
"k": "demo",
"c": "my-channel",
"d": "my message",
"b": "my-channel"
}
]
}"#
"occupancy": 1,
"timestamp": 1580070177,
"uuid": "pn-0ca50551-4bc8-446e-8829-c70b704545fd"
},
"b": "my-channel-pnpres"
}
]
}"#
.into(),
)
}
Expand All @@ -357,17 +395,42 @@ mod should {
async fn subscribe() {
let subscription = client()
.subscribe()
.channels(["world".into()].to_vec())
.channels(["my-channel".into(), "my-channel-pnpres".into()].to_vec())
.execute()
.unwrap();

use futures::StreamExt;
let status = subscription.stream().next().await.unwrap();
let message = subscription.stream().next().await.unwrap();
let presence = subscription.stream().next().await.unwrap();

assert!(matches!(
status,
SubscribeStreamEvent::Status(SubscribeStatus::Connected)
));
assert!(matches!(
message,
SubscribeStreamEvent::Update(Update::Message(_))
));
assert!(matches!(
presence,
SubscribeStreamEvent::Update(Update::Presence(_))
));
if let SubscribeStreamEvent::Update(Update::Presence(Presence::StateChange {
timestamp: _,
channel: _,
subscription: _,
uuid: _,
data,
})) = presence
{
let user_data: UserStateData = serde_json::from_value(data)
.expect("Should successfully deserialize user state object.");
assert!(user_data.is_admin);
assert_eq!(user_data.display_name, "ChannelAdmin");
} else {
panic!("Expected to receive presence update.")
}
}

#[tokio::test]
Expand Down
30 changes: 12 additions & 18 deletions src/dx/subscribe/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,12 @@ pub enum EnvelopePayload {
occupancy: Option<usize>,

/// The user's state associated with the channel has been updated.
data: Option<String>,
#[cfg(feature = "serde")]
data: serde_json::Value,

/// The user's state associated with the channel has been updated.
#[cfg(not(feature = "serde"))]
data: Vec<u8>,

/// The list of unique user identifiers that `joined` the channel since
/// the last interval presence update.
Expand Down Expand Up @@ -539,25 +544,14 @@ impl Envelope {

#[cfg(feature = "std")]
impl Update {
/// Name of channel.
///
/// Name of channel at which update has been received.
pub(crate) fn channel(&self) -> String {
match self {
Update::Presence(presence) => presence.channel(),
Update::Object(object) => object.channel(),
Update::MessageAction(action) => action.channel.to_string(),
Update::File(file) => file.channel.to_string(),
Update::Message(message) | Update::Signal(message) => message.channel.to_string(),
}
}
/// Name of channel.
/// Name of subscription.
///
/// Name of channel at which update has been received.
pub(crate) fn channel_group(&self) -> Option<String> {
/// Name of channel or channel group on which client subscribed and through
/// which real-time update has been delivered.
pub(crate) fn subscription(&self) -> String {
match self {
Update::Presence(presence) => presence.channel_group(),
Update::Object(object) => object.channel_group(),
Update::Presence(presence) => presence.subscription(),
Update::Object(object) => object.subscription(),
Update::MessageAction(action) => action.subscription.clone(),
Update::File(file) => file.subscription.clone(),
Update::Message(message) | Update::Signal(message) => message.subscription.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ mod should {
};
use spin::RwLock;

#[allow(dead_code)]
fn event_engine() -> Arc<SubscribeEventEngine> {
let (cancel_tx, _) = async_channel::bounded(1);

Expand Down Expand Up @@ -286,6 +285,7 @@ mod should {

manager.notify_new_messages(vec![Update::Message(Message {
channel: "test".into(),
subscription: "test".into(),
..Default::default()
})]);

Expand Down
Loading

0 comments on commit c85f301

Please sign in to comment.