Skip to content

Commit

Permalink
Merge branch 'master' into chore/add-code-owner
Browse files Browse the repository at this point in the history
  • Loading branch information
Xavrax authored Aug 28, 2023
2 parents 9dbe5f2 + 352f528 commit 2c20f3a
Show file tree
Hide file tree
Showing 12 changed files with 700 additions and 126 deletions.
4 changes: 3 additions & 1 deletion src/dx/subscribe/builders/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,9 @@ impl Subscription {

fn subscribed_for_update(&self, update: &Update) -> bool {
self.channels.contains(&update.channel())
|| self.channel_groups.contains(&update.channel_group())
|| update
.channel_group()
.is_some_and(|g| self.channel_groups.contains(&g))
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/dx/subscribe/event_engine/effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,24 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
SubscribeEffectInvocation::Handshake {
channels,
channel_groups,
cursor,
} => Some(SubscribeEffect::Handshake {
channels: channels.clone(),
channel_groups: channel_groups.clone(),
cursor: cursor.clone(),
executor: self.subscribe_call.clone(),
cancellation_channel: self.cancellation_channel.clone(),
}),
SubscribeEffectInvocation::HandshakeReconnect {
channels,
channel_groups,
cursor,
attempts,
reason,
} => Some(SubscribeEffect::HandshakeReconnect {
channels: channels.clone(),
channel_groups: channel_groups.clone(),
cursor: cursor.clone(),
attempts: *attempts,
reason: reason.clone(),
retry_policy: self.retry_policy.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/event_engine/effects/emit_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod should {
sender: Some("test-user".into()),
timestamp: 1234567890,
channel: "test".to_string(),
subscription: "test-group".to_string(),
subscription: Some("test-group".to_string()),
data: vec![],
r#type: None,
space_id: None,
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/event_engine/effects/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ mod should {
let mock_handshake_function: Arc<SubscribeEffectExecutor> = Arc::new(move |params| {
assert_eq!(params.channels, &Some(vec!["ch1".to_string()]));
assert_eq!(params.channel_groups, &Some(vec!["cg1".to_string()]));
assert_eq!(params.attempt, 0);
assert_eq!(params.cursor, None);
assert_eq!(params.attempt, 0);
assert_eq!(params.reason, None);
assert_eq!(params.effect_id, "id");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ mod should {
let mock_handshake_function: Arc<SubscribeEffectExecutor> = Arc::new(move |params| {
assert_eq!(params.channels, &Some(vec!["ch1".to_string()]));
assert_eq!(params.channel_groups, &Some(vec!["cg1".to_string()]));
assert_eq!(params.attempt, 1);
assert_eq!(params.cursor, None);
assert_eq!(params.attempt, 1);
assert_eq!(
params.reason.unwrap(),
PubNubError::Transport {
Expand Down
21 changes: 18 additions & 3 deletions src/dx/subscribe/event_engine/effects/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ pub(crate) enum SubscribeEffect {
/// after initial subscription completion.
channel_groups: Option<Vec<String>>,

/// Time cursor.
///
/// Cursor used by subscription loop to identify point in time after
/// which updates will be delivered.
cursor: Option<SubscribeCursor>,

/// Executor function.
///
/// Function which will be used to execute initial subscription.
Expand All @@ -70,6 +76,12 @@ pub(crate) enum SubscribeEffect {
/// initial subscription.
channel_groups: Option<Vec<String>>,

/// Time cursor.
///
/// Cursor used by subscription loop to identify point in time after
/// which updates will be delivered.
cursor: Option<SubscribeCursor>,

/// Current initial subscribe retry attempt.
///
/// Used to track overall number of initial subscription retry attempts.
Expand Down Expand Up @@ -280,7 +292,8 @@ impl Effect for SubscribeEffect {
channels,
channel_groups,
*attempts,
reason.clone(), // TODO: Does run function need to borrow self? Or we can consume it?
reason.clone(), /* TODO: Does run function need to borrow self? Or we can
* consume it? */
&self.id(),
retry_policy,
executor,
Expand Down Expand Up @@ -309,7 +322,8 @@ impl Effect for SubscribeEffect {
channel_groups,
cursor,
*attempts,
reason.clone(), // TODO: Does run function need to borrow self? Or we can consume it?
reason.clone(), /* TODO: Does run function need to borrow self? Or we can
* consume it? */
&self.id(),
retry_policy,
executor,
Expand Down Expand Up @@ -357,12 +371,13 @@ mod should {
use super::*;

#[tokio::test]
async fn send_cancelation_notification() {
async fn send_cancellation_notification() {
let (tx, rx) = async_channel::bounded(1);

let effect = SubscribeEffect::Handshake {
channels: None,
channel_groups: None,
cursor: None,
executor: Arc::new(|_| {
Box::pin(async move {
Ok(SubscribeResult {
Expand Down
12 changes: 12 additions & 0 deletions src/dx/subscribe/event_engine/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ pub(crate) enum SubscribeEffectInvocation {
/// List of channel groups which will be source of real-time updates
/// after initial subscription completion.
channel_groups: Option<Vec<String>>,

/// Time cursor.
///
/// Cursor used by subscription loop to identify point in time after
/// which updates will be delivered.
cursor: Option<SubscribeCursor>,
},

/// Cancel initial subscribe effect invocation.
Expand All @@ -50,6 +56,12 @@ pub(crate) enum SubscribeEffectInvocation {
/// initial subscription.
channel_groups: Option<Vec<String>>,

/// Time cursor.
///
/// Cursor used by subscription loop to identify point in time after
/// which updates will be delivered.
cursor: Option<SubscribeCursor>,

/// Current initial subscribe retry attempt.
///
/// Used to track overall number of initial subscription retry attempts.
Expand Down
Loading

0 comments on commit 2c20f3a

Please sign in to comment.