diff --git a/src/dx/subscribe/builders/subscription.rs b/src/dx/subscribe/builders/subscription.rs index c07b4d17..98dc8f9f 100644 --- a/src/dx/subscribe/builders/subscription.rs +++ b/src/dx/subscribe/builders/subscription.rs @@ -505,7 +505,9 @@ impl Subscription { fn subscribed_for_update(&self, update: &Update) -> bool { self.channels.contains(&update.channel()) - || self.channel_groups.contains(&update.channel_group()) + || update + .channel_group() + .is_some_and(|g| self.channel_groups.contains(&g)) } } diff --git a/src/dx/subscribe/event_engine/effect_handler.rs b/src/dx/subscribe/event_engine/effect_handler.rs index d209312e..343c3c9d 100644 --- a/src/dx/subscribe/event_engine/effect_handler.rs +++ b/src/dx/subscribe/event_engine/effect_handler.rs @@ -61,20 +61,24 @@ impl EffectHandler for SubscribeEffe SubscribeEffectInvocation::Handshake { channels, channel_groups, + cursor, } => Some(SubscribeEffect::Handshake { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), executor: self.subscribe_call.clone(), cancellation_channel: self.cancellation_channel.clone(), }), SubscribeEffectInvocation::HandshakeReconnect { channels, channel_groups, + cursor, attempts, reason, } => Some(SubscribeEffect::HandshakeReconnect { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), attempts: *attempts, reason: reason.clone(), retry_policy: self.retry_policy.clone(), diff --git a/src/dx/subscribe/event_engine/effects/emit_messages.rs b/src/dx/subscribe/event_engine/effects/emit_messages.rs index 8ab16b8b..42c39f02 100644 --- a/src/dx/subscribe/event_engine/effects/emit_messages.rs +++ b/src/dx/subscribe/event_engine/effects/emit_messages.rs @@ -29,7 +29,7 @@ mod should { sender: Some("test-user".into()), timestamp: 1234567890, channel: "test".to_string(), - subscription: "test-group".to_string(), + subscription: Some("test-group".to_string()), data: vec![], r#type: None, space_id: None, diff --git a/src/dx/subscribe/event_engine/effects/handshake.rs b/src/dx/subscribe/event_engine/effects/handshake.rs index 077e9be3..c8f74dc8 100644 --- a/src/dx/subscribe/event_engine/effects/handshake.rs +++ b/src/dx/subscribe/event_engine/effects/handshake.rs @@ -53,8 +53,8 @@ mod should { let mock_handshake_function: Arc = Arc::new(move |params| { assert_eq!(params.channels, &Some(vec!["ch1".to_string()])); assert_eq!(params.channel_groups, &Some(vec!["cg1".to_string()])); - assert_eq!(params.attempt, 0); assert_eq!(params.cursor, None); + assert_eq!(params.attempt, 0); assert_eq!(params.reason, None); assert_eq!(params.effect_id, "id"); diff --git a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs index d7a5130d..e6052c95 100644 --- a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs +++ b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs @@ -64,8 +64,8 @@ mod should { let mock_handshake_function: Arc = Arc::new(move |params| { assert_eq!(params.channels, &Some(vec!["ch1".to_string()])); assert_eq!(params.channel_groups, &Some(vec!["cg1".to_string()])); - assert_eq!(params.attempt, 1); assert_eq!(params.cursor, None); + assert_eq!(params.attempt, 1); assert_eq!( params.reason.unwrap(), PubNubError::Transport { diff --git a/src/dx/subscribe/event_engine/effects/mod.rs b/src/dx/subscribe/event_engine/effects/mod.rs index 768b6444..edf54ff2 100644 --- a/src/dx/subscribe/event_engine/effects/mod.rs +++ b/src/dx/subscribe/event_engine/effects/mod.rs @@ -45,6 +45,12 @@ pub(crate) enum SubscribeEffect { /// after initial subscription completion. channel_groups: Option>, + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, + /// Executor function. /// /// Function which will be used to execute initial subscription. @@ -70,6 +76,12 @@ pub(crate) enum SubscribeEffect { /// initial subscription. channel_groups: Option>, + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, + /// Current initial subscribe retry attempt. /// /// Used to track overall number of initial subscription retry attempts. @@ -280,7 +292,8 @@ impl Effect for SubscribeEffect { channels, channel_groups, *attempts, - reason.clone(), // TODO: Does run function need to borrow self? Or we can consume it? + reason.clone(), /* TODO: Does run function need to borrow self? Or we can + * consume it? */ &self.id(), retry_policy, executor, @@ -309,7 +322,8 @@ impl Effect for SubscribeEffect { channel_groups, cursor, *attempts, - reason.clone(), // TODO: Does run function need to borrow self? Or we can consume it? + reason.clone(), /* TODO: Does run function need to borrow self? Or we can + * consume it? */ &self.id(), retry_policy, executor, @@ -357,12 +371,13 @@ mod should { use super::*; #[tokio::test] - async fn send_cancelation_notification() { + async fn send_cancellation_notification() { let (tx, rx) = async_channel::bounded(1); let effect = SubscribeEffect::Handshake { channels: None, channel_groups: None, + cursor: None, executor: Arc::new(|_| { Box::pin(async move { Ok(SubscribeResult { diff --git a/src/dx/subscribe/event_engine/invocation.rs b/src/dx/subscribe/event_engine/invocation.rs index 17bfd437..e1b16c3d 100644 --- a/src/dx/subscribe/event_engine/invocation.rs +++ b/src/dx/subscribe/event_engine/invocation.rs @@ -31,6 +31,12 @@ pub(crate) enum SubscribeEffectInvocation { /// List of channel groups which will be source of real-time updates /// after initial subscription completion. channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, }, /// Cancel initial subscribe effect invocation. @@ -50,6 +56,12 @@ pub(crate) enum SubscribeEffectInvocation { /// initial subscription. channel_groups: Option>, + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, + /// Current initial subscribe retry attempt. /// /// Used to track overall number of initial subscription retry attempts. diff --git a/src/dx/subscribe/event_engine/state.rs b/src/dx/subscribe/event_engine/state.rs index acbe78ab..ace2ec52 100644 --- a/src/dx/subscribe/event_engine/state.rs +++ b/src/dx/subscribe/event_engine/state.rs @@ -44,6 +44,12 @@ pub(crate) enum SubscribeState { /// List of channel groups which will be source of real-time updates /// after initial subscription completion. channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, }, /// Subscription recover state. @@ -62,6 +68,12 @@ pub(crate) enum SubscribeState { /// initial subscription. channel_groups: Option>, + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, + /// Current initial subscribe retry attempt. /// /// Used to track overall number of initial subscription retry attempts. @@ -87,6 +99,12 @@ pub(crate) enum SubscribeState { /// /// List of channel groups for which initial subscription stopped. channel_groups: Option>, + + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, }, /// Initial subscription failure state. @@ -106,6 +124,12 @@ pub(crate) enum SubscribeState { /// initial subscription. channel_groups: Option>, + /// Time cursor. + /// + /// Cursor used by subscription loop to identify point in time after + /// which updates will be delivered. + cursor: Option, + /// Initial subscribe attempt failure reason. reason: PubNubError, }, @@ -225,33 +249,50 @@ impl SubscribeState { channel_groups: &Option>, ) -> Option> { match self { - Self::Unsubscribed - | Self::Handshaking { .. } - | Self::HandshakeReconnecting { .. } - | Self::HandshakeFailed { .. } => Some(self.transition_to( + Self::Unsubscribed => Some(self.transition_to( Self::Handshaking { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: None, }, None, )), - Self::HandshakeStopped { .. } => Some(self.transition_to( - Self::HandshakeStopped { + Self::Handshaking { cursor, .. } + | Self::HandshakeReconnecting { cursor, .. } + | Self::HandshakeFailed { cursor, .. } => Some(self.transition_to( + Self::Handshaking { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), }, None, )), - Self::Receiving { cursor, .. } - | Self::ReceiveReconnecting { cursor, .. } - | Self::ReceiveFailed { cursor, .. } => Some(self.transition_to( - Self::Receiving { + Self::HandshakeStopped { cursor, .. } => Some(self.transition_to( + Self::Handshaking { channels: channels.clone(), channel_groups: channel_groups.clone(), cursor: cursor.clone(), }, None, )), + Self::Receiving { cursor, .. } | Self::ReceiveReconnecting { cursor, .. } => { + Some(self.transition_to( + Self::Receiving { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: cursor.clone(), + }, + None, + )) + } + Self::ReceiveFailed { cursor, .. } => Some(self.transition_to( + Self::Handshaking { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: Some(cursor.clone()), + }, + None, + )), Self::ReceiveStopped { cursor, .. } => Some(self.transition_to( Self::ReceiveStopped { channels: channels.clone(), @@ -271,33 +312,52 @@ impl SubscribeState { &self, channels: &Option>, channel_groups: &Option>, - cursor: &SubscribeCursor, + restore_cursor: &SubscribeCursor, ) -> Option> { match self { - Self::Unsubscribed - | Self::Handshaking { .. } - | Self::HandshakeReconnecting { .. } - | Self::HandshakeFailed { .. } - | Self::Receiving { .. } - | Self::ReceiveReconnecting { .. } - | Self::ReceiveFailed { .. } => Some(self.transition_to( + Self::Unsubscribed => Some(self.transition_to( + Self::Handshaking { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: Some(restore_cursor.clone()), + }, + None, + )), + Self::Handshaking { cursor, .. } + | Self::HandshakeReconnecting { cursor, .. } + | Self::HandshakeFailed { cursor, .. } + | Self::HandshakeStopped { cursor, .. } => Some(self.transition_to( + Self::Handshaking { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: Some(cursor.clone().unwrap_or(restore_cursor.clone())), + }, + None, + )), + Self::Receiving { .. } | Self::ReceiveReconnecting { .. } => Some(self.transition_to( Self::Receiving { channels: channels.clone(), channel_groups: channel_groups.clone(), - cursor: cursor.clone(), + cursor: restore_cursor.clone(), + }, + None, + )), + Self::ReceiveFailed { .. } => Some(self.transition_to( + Self::Handshaking { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: Some(restore_cursor.clone()), + }, + None, + )), + Self::ReceiveStopped { .. } => Some(self.transition_to( + Self::ReceiveStopped { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + cursor: restore_cursor.clone(), }, None, )), - Self::HandshakeStopped { .. } | Self::ReceiveStopped { .. } => { - Some(self.transition_to( - Self::ReceiveStopped { - channels: channels.clone(), - channel_groups: channel_groups.clone(), - cursor: cursor.clone(), - }, - None, - )) - } } } @@ -307,22 +367,24 @@ impl SubscribeState { /// first time. fn handshake_success_transition( &self, - cursor: &SubscribeCursor, + next_cursor: &SubscribeCursor, ) -> Option> { match self { Self::Handshaking { channels, channel_groups, + cursor, } | Self::HandshakeReconnecting { channels, channel_groups, + cursor, .. } => Some(self.transition_to( Self::Receiving { channels: channels.clone(), channel_groups: channel_groups.clone(), - cursor: cursor.clone(), + cursor: cursor.clone().unwrap_or(next_cursor.clone()), }, Some(vec![EmitStatus(SubscribeStatus::Connected)]), )), @@ -339,10 +401,12 @@ impl SubscribeState { Self::Handshaking { channels, channel_groups, + cursor, } => Some(self.transition_to( Self::HandshakeReconnecting { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), attempts: 1, reason: reason.clone(), }, @@ -364,12 +428,14 @@ impl SubscribeState { Self::HandshakeReconnecting { channels, channel_groups, + cursor, attempts, .. } => Some(self.transition_to( Self::HandshakeReconnecting { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), attempts: attempts + 1, reason: reason.clone(), }, @@ -391,14 +457,16 @@ impl SubscribeState { Self::HandshakeReconnecting { channels, channel_groups, + cursor, .. } => Some(self.transition_to( Self::HandshakeFailed { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), reason: reason.clone(), }, - Some(vec![EmitStatus(SubscribeStatus::ConnectedError( + Some(vec![EmitStatus(SubscribeStatus::ConnectionError( reason.clone(), ))]), )), @@ -431,10 +499,7 @@ impl SubscribeState { channel_groups: channel_groups.clone(), cursor: cursor.clone(), }, - Some(vec![ - EmitMessages(messages.to_vec()), - EmitStatus(SubscribeStatus::Connected), - ]), + Some(vec![EmitMessages(messages.to_vec())]), )), _ => None, } @@ -530,15 +595,18 @@ impl SubscribeState { Self::Handshaking { channels, channel_groups, + cursor, } | Self::HandshakeReconnecting { channels, channel_groups, + cursor, .. } => Some(self.transition_to( Self::HandshakeStopped { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), }, None, )), @@ -574,16 +642,18 @@ impl SubscribeState { Self::HandshakeStopped { channels, channel_groups, - .. + cursor, } | Self::HandshakeFailed { channels, channel_groups, + cursor, .. } => Some(self.transition_to( Self::Handshaking { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), }, None, )), @@ -598,10 +668,10 @@ impl SubscribeState { cursor, .. } => Some(self.transition_to( - Self::Receiving { + Self::Handshaking { channels: channels.clone(), channel_groups: channel_groups.clone(), - cursor: cursor.clone(), + cursor: Some(cursor.clone()), }, None, )), @@ -628,18 +698,22 @@ impl State for SubscribeState { SubscribeState::Handshaking { channels, channel_groups, + cursor, } => Some(vec![Handshake { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), }]), Self::HandshakeReconnecting { channels, channel_groups, + cursor, attempts, reason, } => Some(vec![HandshakeReconnect { channels: channels.clone(), channel_groups: channel_groups.clone(), + cursor: cursor.clone(), attempts: *attempts, reason: reason.clone(), }]), @@ -807,7 +881,8 @@ mod should { }, SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }; "to handshaking on subscription changed" )] @@ -818,10 +893,10 @@ mod should { channel_groups: Some(vec!["gr1".to_string()]), cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } }, - SubscribeState::Receiving { + SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), - cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }) }; "to handshaking on subscription restored" )] @@ -851,7 +926,8 @@ mod should { #[test_case( SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::SubscriptionChanged { channels: Some(vec!["ch2".to_string()]), @@ -859,14 +935,33 @@ mod should { }, SubscribeState::Handshaking { channels: Some(vec!["ch2".to_string()]), - channel_groups: Some(vec!["gr2".to_string()]) + channel_groups: Some(vec!["gr2".to_string()]), + cursor: None, }; "to handshaking on subscription changed" )] #[test_case( SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshaking with custom cursor on subscription changed" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::HandshakeFailure { reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, @@ -874,6 +969,7 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }; @@ -882,19 +978,54 @@ mod should { #[test_case( SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }, + SubscribeEvent::HandshakeFailure { + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }; + "to handshake reconnect with custom cursor on handshake failure" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::Disconnect, SubscribeState::HandshakeStopped { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }; "to handshake stopped on disconnect" )] #[test_case( SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }, + SubscribeEvent::Disconnect, + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshake stopped with custom cursor on disconnect" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::HandshakeSuccess { cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } @@ -909,31 +1040,68 @@ mod should { #[test_case( SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }, + SubscribeEvent::HandshakeSuccess { + cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + }, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: "20".into(), region: 1 } + }; + "to receiving with custom cursor on handshake success" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::SubscriptionRestored { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), - cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, }, - SubscribeState::Receiving { + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }), + }; + "to handshaking on subscription restored" + )] + #[test_case( + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }, + SubscribeEvent::SubscriptionRestored { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), }; - "to receiving on subscription restored" + "to handshaking with custom cursor on subscription restored" )] #[test_case( SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::HandshakeReconnectGiveUp { reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, } }, SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), - channel_groups: Some(vec!["gr1".to_string()]) + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }; "to not change on unexpected event" )] @@ -955,6 +1123,7 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, @@ -964,6 +1133,7 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 2, reason: PubNubError::Transport { details: "Test reason on error".to_string(), response: None, }, }; @@ -973,6 +1143,27 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::HandshakeReconnectFailure { + reason: PubNubError::Transport { details: "Test reason on error".to_string(), response: None, }, + }, + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 2, + reason: PubNubError::Transport { details: "Test reason on error".to_string(), response: None, }, + }; + "to handshake reconnecting with custom cursor on reconnect failure" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, @@ -983,6 +1174,7 @@ mod should { SubscribeState::Handshaking { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), + cursor: None, }; "to handshaking on subscription change" )] @@ -990,6 +1182,26 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshaking with custom cursor on subscription change" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, @@ -997,6 +1209,7 @@ mod should { SubscribeState::HandshakeStopped { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }; "to handshake stopped on disconnect" )] @@ -1004,6 +1217,23 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::Disconnect, + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshake stopped with custom cursor on disconnect" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, @@ -1013,6 +1243,7 @@ mod should { SubscribeState::HandshakeFailed { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, reason: PubNubError::Transport { details: "Test give up reason".to_string(), response: None, } }; "to handshake failed on give up" @@ -1021,6 +1252,26 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::HandshakeReconnectGiveUp { + reason: PubNubError::Transport { details: "Test give up reason".to_string(), response: None, } + }, + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + reason: PubNubError::Transport { details: "Test give up reason".to_string(), response: None, } + }; + "to handshake failed with custom cursor on give up" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, @@ -1038,6 +1289,25 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::HandshakeReconnectSuccess { + cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + }, + SubscribeState::Receiving { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: "20".into(), region: 1 } + }; + "to receiving with custom cursor on reconnect success" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, @@ -1046,17 +1316,38 @@ mod should { channel_groups: Some(vec!["gr2".to_string()]), cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } }, - SubscribeState::Receiving { + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }) + }; + "to handshaking on subscription restored" + )] + #[test_case( + SubscribeState::HandshakeReconnecting { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + attempts: 1, + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::SubscriptionRestored { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), }; - "to receiving on subscription restored" + "to handshaking with custom cursor on subscription restored" )] #[test_case( SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, @@ -1067,6 +1358,7 @@ mod should { SubscribeState::HandshakeReconnecting { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, attempts: 1, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }; @@ -1090,6 +1382,7 @@ mod should { SubscribeState::HandshakeFailed { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, SubscribeEvent::SubscriptionChanged { @@ -1099,6 +1392,7 @@ mod should { SubscribeState::Handshaking { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), + cursor: None, }; "to handshaking on subscription changed" )] @@ -1106,12 +1400,32 @@ mod should { SubscribeState::HandshakeFailed { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshaking with custom cursor on subscription changed" + )] + #[test_case( + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, SubscribeEvent::Reconnect, SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }; "to handshaking on reconnect" )] @@ -1119,6 +1433,22 @@ mod should { SubscribeState::HandshakeFailed { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::Reconnect, + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshaking with custom cursor on reconnect" + )] + #[test_case( + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, SubscribeEvent::SubscriptionRestored { @@ -1126,17 +1456,37 @@ mod should { channel_groups: Some(vec!["gr2".to_string()]), cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } }, - SubscribeState::Receiving { + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }) + }; + "to handshaking on subscription restored" + )] + #[test_case( + SubscribeState::HandshakeFailed { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, + }, + SubscribeEvent::SubscriptionRestored { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }) }; - "to receiving on subscription restored" + "to handshaking with custom cursor on subscription restored" )] #[test_case( SubscribeState::HandshakeFailed { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }, SubscribeEvent::ReceiveSuccess { @@ -1146,6 +1496,7 @@ mod should { SubscribeState::HandshakeFailed { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, reason: PubNubError::Transport { details: "Test reason".to_string(), response: None, }, }; "to not change on unexpected event" @@ -1168,11 +1519,13 @@ mod should { SubscribeState::HandshakeStopped { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::Reconnect, SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }; "to handshaking on reconnect" )] @@ -1180,6 +1533,57 @@ mod should { SubscribeState::HandshakeStopped { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }, + SubscribeEvent::Reconnect, + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshaking with custom cursor on reconnect" + )] + #[test_case( + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }) + }; + "to handshaking on subscription restored" + )] + #[test_case( + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: "10".into(), region: 1 } + }, + SubscribeState::Handshaking { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "20".into(), region: 1 }), + }; + "to handshaking with custom cursor on subscription restored" + )] + #[test_case( + SubscribeState::HandshakeStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }, SubscribeEvent::ReceiveSuccess { cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, @@ -1188,6 +1592,7 @@ mod should { SubscribeState::HandshakeStopped { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), + cursor: None, }; "to not change on unexpected event" )] @@ -1458,12 +1863,12 @@ mod should { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), }, - SubscribeState::Receiving { + SubscribeState::Handshaking { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), - cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }), }; - "to receiving on subscription changed" + "to handshaking on subscription changed" )] #[test_case( SubscribeState::ReceiveFailed { @@ -1477,12 +1882,12 @@ mod should { channel_groups: Some(vec!["gr2".to_string()]), cursor: SubscribeCursor { timetoken: "100".into(), region: 1 }, }, - SubscribeState::Receiving { + SubscribeState::Handshaking { channels: Some(vec!["ch2".to_string()]), channel_groups: Some(vec!["gr2".to_string()]), - cursor: SubscribeCursor { timetoken: "100".into(), region: 1 }, + cursor: Some(SubscribeCursor { timetoken: "100".into(), region: 1 }), }; - "to receiving on subscription restored" + "to handshaking on subscription restored" )] #[test_case( SubscribeState::ReceiveFailed { @@ -1492,12 +1897,12 @@ mod should { reason: PubNubError::Transport { details: "Test error".to_string(), response: None, } }, SubscribeEvent::Reconnect, - SubscribeState::Receiving { + SubscribeState::Handshaking { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), - cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }), }; - "to receiving on reconnect" + "to handshaking on reconnect" )] #[test_case( SubscribeState::ReceiveFailed { @@ -1530,6 +1935,7 @@ mod should { assert_eq!(engine.current_state(), target_state); } + #[test_case( SubscribeState::ReceiveStopped { channels: Some(vec!["ch1".to_string()]), @@ -1537,12 +1943,47 @@ mod should { cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, }, SubscribeEvent::Reconnect, - SubscribeState::Receiving { + SubscribeState::Handshaking { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: Some(SubscribeCursor { timetoken: "10".into(), region: 1 }), + }; + "to handshaking on reconnect" + )] + #[test_case( + SubscribeState::ReceiveStopped { channels: Some(vec!["ch1".to_string()]), channel_groups: Some(vec!["gr1".to_string()]), cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, + }, + SubscribeEvent::SubscriptionChanged { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + }, + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, + }; + "to receive stopped on subscription changed" + )] + #[test_case( + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch1".to_string()]), + channel_groups: Some(vec!["gr1".to_string()]), + cursor: SubscribeCursor { timetoken: "10".into(), region: 1 }, + }, + SubscribeEvent::SubscriptionRestored { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: "100".into(), region: 1 }, + }, + SubscribeState::ReceiveStopped { + channels: Some(vec!["ch2".to_string()]), + channel_groups: Some(vec!["gr2".to_string()]), + cursor: SubscribeCursor { timetoken: "100".into(), region: 1 }, }; - "to receiving on reconnect" + "to receive stopped on subscription restored" )] #[test_case( SubscribeState::ReceiveStopped { diff --git a/src/dx/subscribe/result.rs b/src/dx/subscribe/result.rs index fe6b6c93..95dc3f6f 100644 --- a/src/dx/subscribe/result.rs +++ b/src/dx/subscribe/result.rs @@ -253,8 +253,8 @@ pub struct Envelope { /// channels to [`subscribe`] method call. /// /// [`subscribe`]: crate::dx::subscribe - #[cfg_attr(feature = "serde", serde(rename = "b"))] - pub subscription: String, + #[cfg_attr(feature = "serde", serde(rename = "b"), serde(default))] + pub subscription: Option, /// User provided message type (set only when [`publish`] called with /// `r#type`). @@ -552,13 +552,13 @@ impl Update { /// Name of channel. /// /// Name of channel at which update has been received. - pub(crate) fn channel_group(&self) -> String { + pub(crate) fn channel_group(&self) -> Option { match self { Update::Presence(presence) => presence.channel_group(), Update::Object(object) => object.channel_group(), - Update::MessageAction(action) => action.subscription.to_string(), - Update::File(file) => file.subscription.to_string(), - Update::Message(message) | Update::Signal(message) => message.subscription.to_string(), + Update::MessageAction(action) => action.subscription.clone(), + Update::File(file) => file.subscription.clone(), + Update::Message(message) | Update::Signal(message) => message.subscription.clone(), } } } diff --git a/src/dx/subscribe/subscription_manager.rs b/src/dx/subscribe/subscription_manager.rs index e9d1d464..31b87235 100644 --- a/src/dx/subscribe/subscription_manager.rs +++ b/src/dx/subscribe/subscription_manager.rs @@ -4,6 +4,7 @@ //! active subscription streams. use super::event_engine::SubscribeEvent; +use crate::subscribe::SubscribeCursor; use crate::{ dx::subscribe::{ event_engine::SubscribeEventEngine, result::Update, subscription::Subscription, @@ -56,9 +57,14 @@ impl SubscriptionManager { } pub fn register(&mut self, subscription: Subscription) { + let cursor = subscription.cursor; self.subscribers.push(subscription); - self.change_subscription(); + if let Some(cursor) = cursor { + self.restore_subscription(cursor); + } else { + self.change_subscription(); + } } pub fn unregister(&mut self, subscription: Subscription) { @@ -94,6 +100,32 @@ impl SubscriptionManager { channel_groups: (!channel_groups.is_empty()).then_some(channel_groups), }); } + + fn restore_subscription(&self, cursor: u64) { + let channels = self + .subscribers + .iter() + .flat_map(|val| val.channels.iter()) + .cloned() + .collect::>(); + + let channel_groups = self + .subscribers + .iter() + .flat_map(|val| val.channel_groups.iter()) + .cloned() + .collect::>(); + + self.subscribe_event_engine + .process(&SubscribeEvent::SubscriptionRestored { + channels: (!channels.is_empty()).then_some(channels), + channel_groups: (!channel_groups.is_empty()).then_some(channel_groups), + cursor: SubscribeCursor { + timetoken: cursor.to_string(), + region: 0, + }, + }); + } } #[cfg(test)] diff --git a/src/dx/subscribe/types.rs b/src/dx/subscribe/types.rs index e376b3c1..3d5e2e07 100644 --- a/src/dx/subscribe/types.rs +++ b/src/dx/subscribe/types.rs @@ -104,15 +104,15 @@ pub enum SubscribeStatus { Disconnected, /// Connection attempt failed. - ConnectedError(PubNubError), + ConnectionError(PubNubError), } /// Presence update information. /// -/// Enum provides [`Presence::Join`], [`Presence::Leave`], [`Presence::Timeout`], -/// [`Presence::Interval`] and [`Presence::StateChange`] variants for updates -/// listener. These variants allow listener understand how presence changes on -/// channel. +/// Enum provides [`Presence::Join`], [`Presence::Leave`], +/// [`Presence::Timeout`], [`Presence::Interval`] and [`Presence::StateChange`] +/// variants for updates listener. These variants allow listener understand how +/// presence changes on channel. #[derive(Debug, Clone)] pub enum Presence { /// Remote user `join` update. @@ -130,7 +130,10 @@ pub enum Presence { /// Actual name of subscription through which `user joined` update has /// been delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, /// Current channel occupancy after user joined. occupancy: usize, @@ -148,7 +151,10 @@ pub enum Presence { /// Actual name of subscription through which `user left` update has /// been delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, /// Current channel occupancy after user left. occupancy: usize, @@ -169,7 +175,10 @@ pub enum Presence { /// Actual name of subscription through which `user timeout` update has /// been delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, /// Current channel occupancy after user timeout. occupancy: usize, @@ -191,7 +200,10 @@ pub enum Presence { /// Actual name of subscription through which `interval` update has been /// delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, /// Current channel occupancy. occupancy: usize, @@ -222,7 +234,10 @@ pub enum Presence { /// Actual name of subscription through which `state changed` update has /// been delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, /// Unique identification of the user for which state has been changed. uuid: String, @@ -274,7 +289,10 @@ pub enum Object { /// Actual name of subscription through which `channel object` update /// has been delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, }, /// `UUID` object update. @@ -318,7 +336,10 @@ pub enum Object { /// Actual name of subscription through which `uuid object` update has /// been delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, }, /// `Membership` object update. @@ -351,7 +372,10 @@ pub enum Object { /// Actual name of subscription through which `membership` update has /// been delivered. - subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + subscription: Option, }, } @@ -370,7 +394,10 @@ pub struct Message { pub channel: String, /// Actual name of subscription through which update has been delivered. - pub subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + pub subscription: Option, /// Data published along with message / signal. pub data: Vec, @@ -413,7 +440,10 @@ pub struct MessageAction { pub channel: String, /// Actual name of subscription through which update has been delivered. - pub subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + pub subscription: Option, /// Timetoken of message for which action has been added / removed. pub message_timetoken: String, @@ -445,7 +475,10 @@ pub struct File { pub channel: String, /// Actual name of subscription through which update has been delivered. - pub subscription: String, + /// + /// Will be always `None` except case when update has been delivered + /// not from the channel but from the group. + pub subscription: Option, /// Message which has been associated with uploaded file. message: String, @@ -533,7 +566,7 @@ impl core::fmt::Display for SubscribeStatus { Self::Connected => write!(f, "Connected"), Self::Reconnected => write!(f, "Reconnected"), Self::Disconnected => write!(f, "Disconnected"), - Self::ConnectedError(err) => write!(f, "ConnectionError({err:?})"), + Self::ConnectionError(err) => write!(f, "ConnectionError({err:?})"), } } } @@ -560,17 +593,18 @@ impl Presence { /// Presence update channel group. /// /// Name of channel group through which presence update has been triggered. - pub(crate) fn channel_group(&self) -> String { + pub(crate) fn channel_group(&self) -> Option { match self { Presence::Join { subscription, .. } | Presence::Leave { subscription, .. } | Presence::Timeout { subscription, .. } | Presence::Interval { subscription, .. } - | Presence::StateChange { subscription, .. } => subscription - .split('-') - .last() - .map(|name| name.to_string()) - .unwrap_or(subscription.to_string()), + | Presence::StateChange { subscription, .. } => subscription.clone().map(|s| { + s.split('-') + .last() + .map(|name| name.to_string()) + .unwrap_or(s) + }), } } } @@ -590,11 +624,11 @@ impl Object { /// Object channel group name. /// /// Name of channel group through which object update has been triggered. - pub(crate) fn channel_group(&self) -> String { + pub(crate) fn channel_group(&self) -> Option { match self { Object::Channel { subscription, .. } | Object::Uuid { subscription, .. } - | Object::Membership { subscription, .. } => subscription.to_string(), + | Object::Membership { subscription, .. } => subscription.clone(), } } } @@ -656,14 +690,17 @@ impl TryFrom for Presence { } = value.payload { let action = action.unwrap_or("interval".to_string()); + + let subscription = resolve_subscription_value(value.subscription, &value.channel); + match action.as_str() { "join" => Ok(Self::Join { timestamp, // `join` event always has `uuid` and unwrap_or default // value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel, - subscription: value.subscription, + channel: value.channel.clone(), + subscription, occupancy: occupancy.unwrap_or(0), }), "leave" => Ok(Self::Leave { @@ -671,8 +708,8 @@ impl TryFrom for Presence { // `leave` event always has `uuid` and unwrap_or default // value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel, - subscription: value.subscription, + channel: value.channel.clone(), + subscription, occupancy: occupancy.unwrap_or(0), }), "timeout" => Ok(Self::Timeout { @@ -680,14 +717,14 @@ impl TryFrom for Presence { // `leave` event always has `uuid` and unwrap_or default // value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel, - subscription: value.subscription, + channel: value.channel.clone(), + subscription, occupancy: occupancy.unwrap_or(0), }), "interval" => Ok(Self::Interval { timestamp, - channel: value.channel, - subscription: value.subscription, + channel: value.channel.clone(), + subscription, occupancy: occupancy.unwrap_or(0), join, leave, @@ -698,8 +735,8 @@ impl TryFrom for Presence { // `state-change` event always has `uuid` and unwrap_or // default value won't be actually used. uuid: uuid.unwrap_or("".to_string()), - channel: value.channel, - subscription: value.subscription, + channel: value.channel.clone(), + subscription, data, }), } @@ -724,6 +761,8 @@ impl TryFrom for Object { } = value.payload { let update_type = r#type; + let subscription = resolve_subscription_value(value.subscription, &value.channel); + match data { ObjectDataBody::Channel { name, @@ -745,7 +784,7 @@ impl TryFrom for Object { custom, updated, tag, - subscription: value.subscription, + subscription, }), ObjectDataBody::Uuid { name, @@ -771,7 +810,7 @@ impl TryFrom for Object { custom, updated, tag, - subscription: value.subscription, + subscription, }), ObjectDataBody::Membership { channel, @@ -806,14 +845,14 @@ impl TryFrom for Object { custom: channel_custom, updated: channel_updated, tag: channel_tag, - subscription: value.subscription.clone(), + subscription: subscription.clone(), }), custom, status, uuid, updated, tag, - subscription: value.subscription, + subscription, }) } else { Err(PubNubError::Deserialization { @@ -840,13 +879,14 @@ impl TryFrom for Message { // `Message` / `signal` always has `timetoken` and unwrap_or default // value won't be actually used. let timestamp = value.published.timetoken.parse::().ok().unwrap_or(0); + let subscription = resolve_subscription_value(value.subscription, &value.channel); if let EnvelopePayload::Message(_) = value.payload { Ok(Self { sender: value.sender, timestamp, channel: value.channel, - subscription: value.subscription, + subscription, data: value.payload.into(), r#type: value.r#type, space_id: value.space_id, @@ -870,13 +910,16 @@ impl TryFrom for MessageAction { // `Message action` event always has `sender` and unwrap_or default // value won't be actually used. let sender = value.sender.unwrap_or("".to_string()); + + let subscription = resolve_subscription_value(value.subscription, &value.channel); + if let EnvelopePayload::MessageAction { event, data, .. } = value.payload { Ok(Self { event: event.try_into()?, sender, timestamp, channel: value.channel, - subscription: value.subscription, + subscription, message_timetoken: data.message_timetoken, action_timetoken: data.action_timetoken, r#type: data.r#type, @@ -900,12 +943,15 @@ impl TryFrom for File { // `File` event always has `sender` and unwrap_or default // value won't be actually used. let sender = value.sender.unwrap_or("".to_string()); + + let subscription = resolve_subscription_value(value.subscription, &value.channel); + if let EnvelopePayload::File { message, file } = value.payload { Ok(Self { sender, timestamp, - channel: value.channel, - subscription: value.subscription, + channel: value.channel.clone(), + subscription, message, id: file.id, name: file.name, @@ -917,3 +963,24 @@ impl TryFrom for File { } } } + +fn resolve_subscription_value(subscription: Option, channel: &str) -> Option { + subscription.and_then(|s| s.ne(channel).then_some(s)) +} + +// TODO: add tests for complicated froms. +#[cfg(test)] +mod should { + use super::*; + use test_case::test_case; + + #[test_case(None, "channel" => None; "no subscription")] + #[test_case(Some("channel".into()), "channel" => None; "same subscription and channel")] + #[test_case(Some("channel".into()), "channel2" => Some("channel".into()); "different subscription and channel")] + fn resolve_subscription_field_value( + subscription: Option, + channel: &str, + ) -> Option { + resolve_subscription_value(subscription, channel) + } +} diff --git a/tests/contract_test.rs b/tests/contract_test.rs index 6f1cc379..77b96fc9 100644 --- a/tests/contract_test.rs +++ b/tests/contract_test.rs @@ -104,7 +104,8 @@ async fn main() { let _ = std::fs::create_dir_all("tests/reports"); let file: File = File::create("tests/reports/report-required.xml").unwrap(); PubNubWorld::cucumber() - .max_concurrent_scenarios(1) // sequential execution because tomato waits for a specific request at a time for which a script is initialised. + .max_concurrent_scenarios(1) // sequential execution because tomato waits for a specific request at a time for which a + // script is initialised. .before(|_feature, _rule, scenario, world| { world.scenario = Some(scenario.clone());