diff --git a/.pubnub.yml b/.pubnub.yml index da0930e9..9eb1e8da 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,9 +1,28 @@ name: rust -version: 0.5.0 +version: 0.6.0 schema: 1 scm: github.com/pubnub/rust files: [] changelog: + - date: 2024-02-07 + version: 0.6.0 + changes: + - type: feature + text: "Make it possible to create `SubscriptionCursor` from the string slice." + - type: feature + text: "Add `add_subscriptions(..)` and `sub_subscriptions(..)` to `SubscriptionSet` to make it possible in addition to sets manipulation use list of subscriptions." + - type: bug + text: "Fix issue because of which `cursor` is not reset on `Subscription` and `SubscriptionSet` on unsubscribe." + - type: bug + text: "Fix issue because of which cancelled effects still asynchronously spawned for processing." + - type: improvement + text: "Change `client` to `pubnub` in inline docs." + - type: improvement + text: "Add subscription token validation." + - type: improvement + text: "Added a method to validate the provided subscription token to conform to PubNub time token requirements with precision." + - type: improvement + text: "Separate `subscribe` example into two to show separately `subscribe` feature and `presence state` maintenance with subscribe." - date: 2024-01-25 version: 0.5.0 changes: diff --git a/Cargo.toml b/Cargo.toml index afcc3265..7739d7ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pubnub" -version = "0.5.0" +version = "0.6.0" edition = "2021" license-file = "LICENSE" authors = ["PubNub "] @@ -165,6 +165,10 @@ required-features = ["default"] [[example]] name = "subscribe" +required-features = ["default", "subscribe"] + +[[example]] +name = "subscribe_with_presence_state" required-features = ["default", "subscribe", "presence"] [[example]] diff --git a/README.md b/README.md index 51ede15b..c125e259 100644 --- a/README.md +++ b/README.md @@ -36,11 +36,11 @@ Add `pubnub` to your Rust project in the `Cargo.toml` file: ```toml # default features [dependencies] -pubnub = "0.5.0" +pubnub = "0.6.0" # all features [dependencies] -pubnub = { version = "0.5.0", features = ["full"] } +pubnub = { version = "0.6.0", features = ["full"] } ``` ### Example @@ -48,17 +48,20 @@ pubnub = { version = "0.5.0", features = ["full"] } Try the following sample code to get up and running quickly! ```rust -use pubnub::{Keyset, PubNubClientBuilder}; -use pubnub::dx::subscribe::{SubscribeStreamEvent, Update}; +use pubnub::subscribe::Subscriber; use futures::StreamExt; use tokio::time::sleep; use std::time::Duration; use serde_json; - +use pubnub::{ + dx::subscribe::Update, + subscribe::EventSubscriber, + Keyset, PubNubClientBuilder, +}; #[tokio::main] async fn main() -> Result<(), Box> { use pubnub::subscribe::{EventEmitter, SubscriptionParams}; -let publish_key = "my_publish_key"; + let publish_key = "my_publish_key"; let subscribe_key = "my_subscribe_key"; let client = PubNubClientBuilder::with_reqwest_transport() .with_keyset(Keyset { @@ -68,6 +71,7 @@ let publish_key = "my_publish_key"; }) .with_user_id("user_id") .build()?; + println!("PubNub instance created"); let subscription = client.subscription(SubscriptionParams { @@ -76,7 +80,13 @@ let publish_key = "my_publish_key"; options: None }); - println!("Subscribed to channel"); + let channel_entity = client.channel("my_channel_2"); + let channel_entity_subscription = channel_entity.subscription(None); + + subscription.subscribe(); + channel_entity_subscription.subscribe(); + + println!("Subscribed to channels"); // Launch a new task to print out each received message tokio::spawn(client.status_stream().for_each(|status| async move { @@ -107,7 +117,21 @@ let publish_key = "my_publish_key"; } })); - sleep(Duration::from_secs(1)).await; + // Explicitly listen only for real-time `message` updates. + tokio::spawn( + channel_entity_subscription + .messages_stream() + .for_each(|message| async move { + if let Ok(utf8_message) = String::from_utf8(message.data.clone()) { + if let Ok(cleaned) = serde_json::from_str::(&utf8_message) { + println!("message: {}", cleaned); + } + } + }), + ); + + sleep(Duration::from_secs(2)).await; + // Send a message to the channel client .publish_message("hello world!") @@ -116,7 +140,15 @@ let publish_key = "my_publish_key"; .execute() .await?; - sleep(Duration::from_secs(10)).await; + // Send a message to another channel + client + .publish_message("hello world on the other channel!") + .channel("my_channel_2") + .r#type("text-message") + .execute() + .await?; + + sleep(Duration::from_secs(15)).await; Ok(()) } @@ -132,11 +164,11 @@ disable them in the `Cargo.toml` file, like so: ```toml # only blocking and access + default features [dependencies] -pubnub = { version = "0.5.0", features = ["blocking", "access"] } +pubnub = { version = "0.6.0", features = ["blocking", "access"] } # only parse_token + default features [dependencies] -pubnub = { version = "0.5.0", features = ["parse_token"] } +pubnub = { version = "0.6.0", features = ["parse_token"] } ``` ### Available features @@ -175,7 +207,7 @@ you need, for example: ```toml [dependencies] -pubnub = { version = "0.5.0", default-features = false, features = ["serde", "publish", +pubnub = { version = "0.6.0", default-features = false, features = ["serde", "publish", "blocking"] } ``` diff --git a/examples/subscribe.rs b/examples/subscribe.rs index fd9ab7c4..f89466f0 100644 --- a/examples/subscribe.rs +++ b/examples/subscribe.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use futures::StreamExt; use serde::Deserialize; use std::env; @@ -26,7 +24,7 @@ async fn main() -> Result<(), Box> { let publish_key = env::var("SDK_PUB_KEY")?; let subscribe_key = env::var("SDK_SUB_KEY")?; - let client = PubNubClientBuilder::with_reqwest_transport() + let pubnub = PubNubClientBuilder::with_reqwest_transport() .with_keyset(Keyset { subscribe_key, publish_key: Some(publish_key), @@ -40,36 +38,25 @@ async fn main() -> Result<(), Box> { println!("running!"); - client - .set_presence_state(HashMap::::from([ - ( - "is_doing".to_string(), - "Nothing... Just hanging around...".to_string(), - ), - ("flag".to_string(), "false".to_string()), - ])) - .channels(["my_channel".into(), "other_channel".into()].to_vec()) - .user_id("user_id") - .execute() - .await?; - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - let subscription = client.subscription(SubscriptionParams { + let subscription = pubnub.subscription(SubscriptionParams { channels: Some(&["my_channel", "other_channel"]), channel_groups: None, options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]), }); - subscription.subscribe(None); + subscription.subscribe(); let subscription_clone = subscription.clone_empty(); // Attach connection status to the PubNub client instance. tokio::spawn( - client + pubnub .status_stream() .for_each(|status| async move { println!("\nstatus: {:?}", status) }), ); + // Example of the "global" listener for multiplexed subscription object from + // PubNub client. tokio::spawn(subscription.stream().for_each(|event| async move { match event { Update::Message(message) | Update::Signal(message) => { @@ -96,9 +83,11 @@ async fn main() -> Result<(), Box> { } })); - tokio::spawn(subscription_clone.stream().for_each(|event| async move { - match event { - Update::Message(message) | Update::Signal(message) => { + // Explicitly listen only for real-time `message` updates. + tokio::spawn( + subscription_clone + .messages_stream() + .for_each(|message| async move { // Deserialize the message payload as you wish match serde_json::from_slice::(&message.data) { Ok(message) => println!("(b) defined message: {:?}", message), @@ -106,25 +95,19 @@ async fn main() -> Result<(), Box> { println!("(b) other message: {:?}", String::from_utf8(message.data)) } } - } - Update::Presence(presence) => { - println!("(b) presence: {:?}", presence) - } - Update::AppContext(object) => { - println!("(b) object: {:?}", object) - } - Update::MessageAction(action) => { - println!("(b) message action: {:?}", action) - } - Update::File(file) => { - println!("(b) file: {:?}", file) - } - } - })); + }), + ); + + // Explicitly listen only for real-time `file` updates. + tokio::spawn( + subscription_clone + .files_stream() + .for_each(|file| async move { println!("(b) file: {:?}", file) }), + ); // Sleep for a minute. Now you can send messages to the channels // "my_channel" and "other_channel" and see them printed in the console. - // You can use the publish example or [PubNub console](https://www.pubnub.com/docs/console/) + // You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/) // to send messages. tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; @@ -132,12 +115,12 @@ async fn main() -> Result<(), Box> { // subscription.unsubscribe(); println!("\nDisconnect from the real-time data stream"); - client.disconnect(); + pubnub.disconnect(); tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; println!("\nReconnect to the real-time data stream"); - client.reconnect(None); + pubnub.reconnect(None); // Let event engine process unsubscribe request tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; @@ -147,9 +130,11 @@ async fn main() -> Result<(), Box> { // drop(subscription_clone); println!( - "\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." ); + "\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \ + `subscription.subscribe_with_timetoken(Some()) call." + ); // Clean up before complete work with PubNub client instance. - client.unsubscribe_all(); + pubnub.unsubscribe_all(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; Ok(()) diff --git a/examples/subscribe_with_presence_state.rs b/examples/subscribe_with_presence_state.rs new file mode 100644 index 00000000..423ef7ac --- /dev/null +++ b/examples/subscribe_with_presence_state.rs @@ -0,0 +1,156 @@ +use std::collections::HashMap; + +use futures::StreamExt; +use serde::Deserialize; +use std::env; + +use pubnub::subscribe::{SubscriptionOptions, SubscriptionParams}; +use pubnub::{ + dx::subscribe::Update, + subscribe::{EventEmitter, EventSubscriber}, + Keyset, PubNubClientBuilder, +}; + +#[derive(Debug, Deserialize)] +struct Message { + // Allowing dead code because we don't use these fields + // in this example. + #[allow(dead_code)] + url: String, + #[allow(dead_code)] + description: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let publish_key = env::var("SDK_PUB_KEY")?; + let subscribe_key = env::var("SDK_SUB_KEY")?; + + let pubnub = PubNubClientBuilder::with_reqwest_transport() + .with_keyset(Keyset { + subscribe_key, + publish_key: Some(publish_key), + secret_key: None, + }) + .with_user_id("user_id") + .with_filter_expression("some_filter") + .with_heartbeat_value(100) + .with_heartbeat_interval(5) + .build()?; + + println!("running!"); + + // Setting up state which will be associated with the user id as long as he is + // subscribed and not timeout. + pubnub + .set_presence_state(HashMap::::from([ + ( + "is_doing".to_string(), + "Nothing... Just hanging around...".to_string(), + ), + ("flag".to_string(), "false".to_string()), + ])) + .channels(["my_channel".into(), "other_channel".into()].to_vec()) + .user_id("user_id") + .execute() + .await?; + + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let subscription = pubnub.subscription(SubscriptionParams { + channels: Some(&["my_channel", "other_channel"]), + channel_groups: None, + options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]), + }); + subscription.subscribe(); + let subscription_clone = subscription.clone_empty(); + + // Attach connection status to the PubNub client instance. + tokio::spawn( + pubnub + .status_stream() + .for_each(|status| async move { println!("\nstatus: {:?}", status) }), + ); + + tokio::spawn(subscription.stream().for_each(|event| async move { + match event { + Update::Message(message) | Update::Signal(message) => { + // Deserialize the message payload as you wish + match serde_json::from_slice::(&message.data) { + Ok(message) => println!("(a) defined message: {:?}", message), + Err(_) => { + println!("(a) other message: {:?}", String::from_utf8(message.data)) + } + } + } + Update::Presence(presence) => { + println!("(a) presence: {:?}", presence) + } + Update::AppContext(object) => { + println!("(a) object: {:?}", object) + } + Update::MessageAction(action) => { + println!("(a) message action: {:?}", action) + } + Update::File(file) => { + println!("(a) file: {:?}", file) + } + } + })); + + // Explicitly listen only for real-time `message` updates. + tokio::spawn( + subscription_clone + .messages_stream() + .for_each(|message| async move { + // Deserialize the message payload as you wish + match serde_json::from_slice::(&message.data) { + Ok(message) => println!("(b) defined message: {:?}", message), + Err(_) => { + println!("(b) other message: {:?}", String::from_utf8(message.data)) + } + } + }), + ); + + // Explicitly listen only for real-time `file` updates. + tokio::spawn( + subscription_clone + .files_stream() + .for_each(|file| async move { println!("(b) file: {:?}", file) }), + ); + + // Sleep for a minute. Now you can send messages to the channels + // "my_channel" and "other_channel" and see them printed in the console. + // You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/) + // to send messages. + tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; + + // You can also cancel the subscription at any time. + // subscription.unsubscribe(); + + println!("\nDisconnect from the real-time data stream"); + pubnub.disconnect(); + + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + println!("\nReconnect to the real-time data stream"); + pubnub.reconnect(None); + + // Let event engine process unsubscribe request + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + // If Subscription or Subscription will go out of scope they will unsubscribe. + // drop(subscription); + // drop(subscription_clone); + + println!( + "\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \ + `subscription.subscribe_with_timetoken(Some()) call." + ); + // Clean up before complete work with PubNub client instance. + pubnub.unsubscribe_all(); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + Ok(()) +} diff --git a/src/core/event_engine/effect.rs b/src/core/event_engine/effect.rs index 3e632977..4e6e5cce 100644 --- a/src/core/event_engine/effect.rs +++ b/src/core/event_engine/effect.rs @@ -18,4 +18,14 @@ pub(crate) trait Effect: Send + Sync { /// Cancel any ongoing effect's work. fn cancel(&self); + + /// Check whether effect has been cancelled. + /// + /// Event engine dispatch effects asynchronously and there is a chance that + /// effect already has been cancelled. + /// + /// # Returns + /// + /// `true` if effect has been cancelled. + fn is_cancelled(&self) -> bool; } diff --git a/src/core/event_engine/effect_dispatcher.rs b/src/core/event_engine/effect_dispatcher.rs index 153eb1f5..ea219e49 100644 --- a/src/core/event_engine/effect_dispatcher.rs +++ b/src/core/event_engine/effect_dispatcher.rs @@ -85,6 +85,11 @@ where let cloned_self = cloned_self.clone(); runtime_clone.spawn(async move { + // There is no need to spawn effect which already has been + // cancelled. + if effect.is_cancelled() { + return; + } let events = effect.run().await; if invocation.is_managed() { @@ -197,6 +202,10 @@ mod should { fn cancel(&self) { // Do nothing. } + + fn is_cancelled(&self) -> bool { + false + } } enum TestInvocation { diff --git a/src/core/event_engine/mod.rs b/src/core/event_engine/mod.rs index 7cda7a88..dae326ec 100644 --- a/src/core/event_engine/mod.rs +++ b/src/core/event_engine/mod.rs @@ -314,6 +314,10 @@ mod should { fn cancel(&self) { // Do nothing. } + + fn is_cancelled(&self) -> bool { + false + } } enum TestInvocation { diff --git a/src/dx/presence/event_engine/effect_handler.rs b/src/dx/presence/event_engine/effect_handler.rs index 633043c8..482a74a3 100644 --- a/src/dx/presence/event_engine/effect_handler.rs +++ b/src/dx/presence/event_engine/effect_handler.rs @@ -4,6 +4,7 @@ //! event engine for use async_channel::Sender; +use spin::RwLock; use uuid::Uuid; use crate::{ @@ -77,6 +78,7 @@ impl EffectHandler for PresenceEffectH reason, } => Some(PresenceEffect::DelayedHeartbeat { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: input.clone(), attempts: *attempts, reason: reason.clone(), @@ -91,6 +93,7 @@ impl EffectHandler for PresenceEffectH }), PresenceEffectInvocation::Wait { input } => Some(PresenceEffect::Wait { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: input.clone(), executor: self.wait_call.clone(), cancellation_channel: self.cancellation_channel.clone(), diff --git a/src/dx/presence/event_engine/effects/heartbeat.rs b/src/dx/presence/event_engine/effects/heartbeat.rs index e987b76d..ecd14cb2 100644 --- a/src/dx/presence/event_engine/effects/heartbeat.rs +++ b/src/dx/presence/event_engine/effects/heartbeat.rs @@ -46,7 +46,14 @@ pub(super) async fn execute( effect_id, }) .map_ok_or_else( - |error| vec![PresenceEvent::HeartbeatFailure { reason: error }], + |error| { + log::error!("Handshake error: {:?}", error); + + // Cancel is possible and no retries should be done. + (!matches!(error, PubNubError::EffectCanceled)) + .then(|| vec![PresenceEvent::HeartbeatFailure { reason: error }]) + .unwrap_or(vec![]) + }, |_| vec![PresenceEvent::HeartbeatSuccess], ) .await @@ -183,6 +190,37 @@ mod it_should { )); } + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mocked_heartbeat_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &PresenceInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + 5, + Some(PubNubError::Transport { + details: "test".into(), + response: Some(Box::new(TransportResponse { + status: 500, + ..Default::default() + })), + }), + "id", + &RequestRetryConfiguration::Linear { + max_retry: 5, + delay: 2, + excluded_endpoints: None, + }, + &mocked_heartbeat_function, + ) + .await; + + assert!(result.is_empty()); + } + #[tokio::test] async fn return_heartbeat_give_up_event_on_error_with_none_auto_retry_policy() { let mocked_heartbeat_function: Arc = Arc::new(move |_| { diff --git a/src/dx/presence/event_engine/effects/mod.rs b/src/dx/presence/event_engine/effects/mod.rs index 4dceb748..07528ada 100644 --- a/src/dx/presence/event_engine/effects/mod.rs +++ b/src/dx/presence/event_engine/effects/mod.rs @@ -2,6 +2,7 @@ use async_channel::Sender; use futures::future::BoxFuture; +use spin::RwLock; use crate::{ core::{ @@ -73,6 +74,9 @@ pub(crate) enum PresenceEffect { /// Unique effect identifier. id: String, + /// Whether delayed heartbeat effect has been cancelled or not. + cancelled: RwLock, + /// User input with channels and groups. /// /// Object contains list of channels and groups for which `user_id` @@ -123,6 +127,9 @@ pub(crate) enum PresenceEffect { /// Unique effect identifier. id: String, + /// Whether wait effect has been cancelled or not. + cancelled: RwLock, + /// User input with channels and groups. /// /// Object contains list of channels and groups for which `user_id` @@ -176,16 +183,6 @@ impl Debug for PresenceEffect { impl Effect for PresenceEffect { type Invocation = PresenceEffectInvocation; - fn id(&self) -> String { - match self { - Self::Heartbeat { id, .. } - | Self::DelayedHeartbeat { id, .. } - | Self::Leave { id, .. } - | Self::Wait { id, .. } => id, - } - .into() - } - fn name(&self) -> String { match self { Self::Heartbeat { .. } => "HEARTBEAT", @@ -196,6 +193,16 @@ impl Effect for PresenceEffect { .into() } + fn id(&self) -> String { + match self { + Self::Heartbeat { id, .. } + | Self::DelayedHeartbeat { id, .. } + | Self::Leave { id, .. } + | Self::Wait { id, .. } => id, + } + .into() + } + async fn run(&self) -> Vec<::Event> { match self { Self::Heartbeat { @@ -245,14 +252,20 @@ impl Effect for PresenceEffect { match self { PresenceEffect::DelayedHeartbeat { id, + cancelled, cancellation_channel, .. } | PresenceEffect::Wait { id, + cancelled, cancellation_channel, .. } => { + { + let mut cancelled_slot = cancelled.write(); + *cancelled_slot = true; + } cancellation_channel .send_blocking(id.clone()) .expect("Cancellation pipe is broken!"); @@ -260,6 +273,15 @@ impl Effect for PresenceEffect { _ => { /* cannot cancel other effects */ } } } + + fn is_cancelled(&self) -> bool { + match self { + Self::DelayedHeartbeat { cancelled, .. } | Self::Wait { cancelled, .. } => { + *cancelled.read() + } + _ => false, + } + } } #[cfg(test)] @@ -273,6 +295,7 @@ mod it_should { let effect = PresenceEffect::Wait { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: PresenceInput::new(&None, &None), executor: Arc::new(|_| Box::pin(async move { Ok(()) })), cancellation_channel: tx, @@ -288,6 +311,7 @@ mod it_should { let effect = PresenceEffect::DelayedHeartbeat { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: PresenceInput::new(&None, &None), attempts: 0, reason: PubNubError::EffectCanceled, diff --git a/src/dx/pubnub_client.rs b/src/dx/pubnub_client.rs index b5df8a8f..dd2650d1 100644 --- a/src/dx/pubnub_client.rs +++ b/src/dx/pubnub_client.rs @@ -76,7 +76,7 @@ use crate::{ /// // note that `with_reqwest_transport` requires `reqwest` feature /// // to be enabled (default) /// # fn main() -> Result<(), pubnub::core::PubNubError> { -/// let client = PubNubClientBuilder::with_reqwest_transport() +/// let pubnub = PubNubClientBuilder::with_reqwest_transport() /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -112,7 +112,7 @@ use crate::{ /// // note that MyTransport must implement the `Transport` trait /// let transport = MyTransport::new(); /// -/// let client = PubNubClientBuilder::with_transport(MyTransport) +/// let pubnub = PubNubClientBuilder::with_transport(MyTransport) /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -161,7 +161,7 @@ pub type PubNubGenericClient = PubNubClientInstance, D /// // note that `with_reqwest_transport` requires `reqwest` feature /// // to be enabled (default) /// # fn main() -> Result<(), pubnub::core::PubNubError> { -/// let client = PubNubClientBuilder::with_reqwest_transport() +/// let pubnub = PubNubClientBuilder::with_reqwest_transport() /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -197,7 +197,7 @@ pub type PubNubGenericClient = PubNubClientInstance, D /// // note that MyTransport must implement the `Transport` trait /// let transport = MyTransport::new(); /// -/// let client = PubNubClientBuilder::with_transport(MyTransport) +/// let pubnub = PubNubClientBuilder::with_transport(MyTransport) /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -417,7 +417,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -426,7 +426,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let channel = client.channel("my_channel"); + /// let channel = pubnub.channel("my_channel"); /// # Ok(()) /// # } /// ``` @@ -464,7 +464,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -473,7 +473,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let channel = client.channels(&["my_channel_1", "my_channel_2"]); + /// let channels = pubnub.channels(&["my_channel_1", "my_channel_2"]); /// # Ok(()) /// # } /// ``` @@ -517,7 +517,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -526,7 +526,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let channel_group = client.channel_group("my_group"); + /// let channel_group = pubnub.channel_group("my_group"); /// # Ok(()) /// # } /// ``` @@ -565,7 +565,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -574,7 +574,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let channel_groups = client.channel_groups(&["my_group_1", "my_group_2"]); + /// let channel_groups = pubnub.channel_groups(&["my_group_1", "my_group_2"]); /// # Ok(()) /// # } /// ``` @@ -620,7 +620,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -629,7 +629,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let channel_metadata = client.channel_metadata("channel_meta"); + /// let channel_metadata = pubnub.channel_metadata("channel_meta"); /// # Ok(()) /// # } /// ``` @@ -670,7 +670,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -679,7 +679,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let channels_metadata = client.channels_metadata( + /// let channels_metadata = pubnub.channels_metadata( /// &["channel_meta_1", "channel_meta_2"] /// ); /// # Ok(()) @@ -727,7 +727,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -736,7 +736,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let user_metadata = client.user_metadata("user_meta"); + /// let user_metadata = pubnub.user_metadata("user_meta"); /// # Ok(()) /// # } /// ``` @@ -776,7 +776,7 @@ impl PubNubClientInstance { /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -785,7 +785,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let users_metadata = client.users_metadata(&["user_meta_1", "user_meta_2"]); + /// let users_metadata = pubnub.users_metadata(&["user_meta_1", "user_meta_2"]); /// # Ok(()) /// # } /// ``` @@ -823,7 +823,7 @@ impl PubNubClientInstance { /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { /// let token = ""; - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -832,7 +832,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// client.set_token(token); + /// pubnub.set_token(token); /// // Now client has access to all endpoints for which `token` has /// // permissions. /// # Ok(()) @@ -854,7 +854,7 @@ impl PubNubClientInstance { /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { /// # let token = ""; - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -863,8 +863,8 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// # client.set_token(token); - /// println!("Current authentication token: {:?}", client.get_token()); + /// # pubnub.set_token(token); + /// println!("Current authentication token: {:?}", pubnub.get_token()); /// // Now client has access to all endpoints for which `token` has /// // permissions. /// # Ok(()) @@ -1293,7 +1293,7 @@ impl PubNubClientBuilder { /// // note that MyTransport must implement the `Transport` trait /// let transport = MyTransport::new(); /// - /// let client = PubNubClientBuilder::with_transport(transport) + /// let pubnub = PubNubClientBuilder::with_transport(transport) /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -1343,7 +1343,7 @@ impl PubNubClientBuilder { /// // note that MyTransport must implement the `Transport` trait /// let transport = MyTransport::new(); /// - /// let client = PubNubClientBuilder::with_transport(transport) + /// let pubnub = PubNubClientBuilder::with_transport(transport) /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -1392,7 +1392,7 @@ impl PubNubClientBuilder { /// // note that MyTransport must implement the `Transport` trait /// let transport = MyTransport::new(); /// - /// let client = PubNubClientBuilder::with_blocking_transport(transport) + /// let pubnub = PubNubClientBuilder::with_blocking_transport(transport) /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -1445,7 +1445,7 @@ impl PubNubClientBuilder { /// // note that MyTransport must implement the `Transport` trait /// let transport = MyTransport::new(); /// - /// let client = PubNubClientBuilder::with_blocking_transport(transport) + /// let pubnub = PubNubClientBuilder::with_blocking_transport(transport) /// .with_keyset(Keyset { /// publish_key: Some("pub-c-abc123"), /// subscribe_key: "sub-c-abc123", @@ -1591,7 +1591,7 @@ impl PubNubClientRuntimeBuilder { /// # fn main() -> Result<(), Box> { /// // note that with_reqwest_transport is only available when /// // the `reqwest` feature is enabled (default) - /// let client = PubNubClientBuilder::with_reqwest_transport() + /// let pubnub = PubNubClientBuilder::with_reqwest_transport() /// .with_runtime(MyRuntime) /// .with_keyset(Keyset { /// subscribe_key: "sub-c-abc123", @@ -1653,7 +1653,7 @@ impl PubNubClientRuntimeBuilder { /// # fn main() -> Result<(), Box> { /// // note that with_reqwest_transport is only available when /// // the `reqwest` feature is enabled (default) - /// let client = PubNubClientBuilder::with_reqwest_transport() + /// let pubnub = PubNubClientBuilder::with_reqwest_transport() /// .with_runtime(MyRuntime) /// .with_keyset(Keyset { /// subscribe_key: "sub-c-abc123", diff --git a/src/dx/subscribe/event_dispatcher.rs b/src/dx/subscribe/event_dispatcher.rs index dfd603a7..bb2007a5 100644 --- a/src/dx/subscribe/event_dispatcher.rs +++ b/src/dx/subscribe/event_dispatcher.rs @@ -1,7 +1,7 @@ //! # Event dispatcher module //! //! This module contains the [`EventDispatcher`] type, which is used by -//! [`PubNubClientInstance`], [`Subscription2`] and [`SubscriptionSet`] to let +//! [`PubNubClientInstance`], [`Subscription`] and [`SubscriptionSet`] to let //! users attach listeners to the specific event types. use spin::{RwLock, RwLockReadGuard, RwLockWriteGuard}; diff --git a/src/dx/subscribe/event_engine/effect_handler.rs b/src/dx/subscribe/event_engine/effect_handler.rs index 5c5b2104..462512ae 100644 --- a/src/dx/subscribe/event_engine/effect_handler.rs +++ b/src/dx/subscribe/event_engine/effect_handler.rs @@ -1,4 +1,5 @@ use async_channel::Sender; +use spin::rwlock::RwLock; use uuid::Uuid; use crate::core::RequestRetryConfiguration; @@ -60,6 +61,7 @@ impl EffectHandler for SubscribeEffe SubscribeEffectInvocation::Handshake { input, cursor } => { Some(SubscribeEffect::Handshake { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: input.clone(), cursor: cursor.clone(), executor: self.subscribe_call.clone(), @@ -73,6 +75,7 @@ impl EffectHandler for SubscribeEffe reason, } => Some(SubscribeEffect::HandshakeReconnect { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: input.clone(), cursor: cursor.clone(), attempts: *attempts, @@ -84,6 +87,7 @@ impl EffectHandler for SubscribeEffe SubscribeEffectInvocation::Receive { input, cursor } => { Some(SubscribeEffect::Receive { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: input.clone(), cursor: cursor.clone(), executor: self.subscribe_call.clone(), @@ -97,6 +101,7 @@ impl EffectHandler for SubscribeEffe reason, } => Some(SubscribeEffect::ReceiveReconnect { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: input.clone(), cursor: cursor.clone(), attempts: *attempts, diff --git a/src/dx/subscribe/event_engine/effects/handshake.rs b/src/dx/subscribe/event_engine/effects/handshake.rs index cc0a3315..c0fab74b 100644 --- a/src/dx/subscribe/event_engine/effects/handshake.rs +++ b/src/dx/subscribe/event_engine/effects/handshake.rs @@ -1,6 +1,7 @@ use futures::TryFutureExt; use log::info; +use crate::core::PubNubError; use crate::subscribe::SubscriptionCursor; use crate::{ dx::subscribe::event_engine::{ @@ -36,7 +37,11 @@ pub(super) async fn execute( .map_ok_or_else( |error| { log::error!("Handshake error: {:?}", error); - vec![SubscribeEvent::HandshakeFailure { reason: error }] + + // Cancel is possible and no retries should be done. + (!matches!(error, PubNubError::EffectCanceled)) + .then(|| vec![SubscribeEvent::HandshakeFailure { reason: error }]) + .unwrap_or(vec![]) }, |subscribe_result| { let cursor = { @@ -126,4 +131,23 @@ mod should { SubscribeEvent::HandshakeFailure { .. } )); } + + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_handshake_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &None, + "id", + &mock_handshake_function, + ) + .await; + + assert!(result.is_empty()); + } } diff --git a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs index 8a89ba9f..17651655 100644 --- a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs +++ b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs @@ -21,7 +21,9 @@ pub(super) async fn execute( retry_policy: &RequestRetryConfiguration, executor: &Arc, ) -> Vec { - if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) { + if !matches!(reason, PubNubError::EffectCanceled) + && !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) + { return vec![SubscribeEvent::HandshakeReconnectGiveUp { reason }]; } @@ -176,6 +178,38 @@ mod should { )); } + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_handshake_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &None, + 1, + PubNubError::Transport { + details: "test".into(), + response: Some(Box::new(TransportResponse { + status: 500, + ..Default::default() + })), + }, + "id", + &RequestRetryConfiguration::Linear { + delay: 0, + max_retry: 1, + excluded_endpoints: None, + }, + &mock_handshake_function, + ) + .await; + + assert!(result.is_empty()); + } + #[tokio::test] async fn return_handshake_reconnect_give_up_event_on_err_with_none_auto_retry_policy() { let mock_handshake_function: Arc = Arc::new(move |_| { diff --git a/src/dx/subscribe/event_engine/effects/mod.rs b/src/dx/subscribe/event_engine/effects/mod.rs index a51f9199..f5893c97 100644 --- a/src/dx/subscribe/event_engine/effects/mod.rs +++ b/src/dx/subscribe/event_engine/effects/mod.rs @@ -2,6 +2,7 @@ use async_channel::Sender; use futures::future::BoxFuture; +use spin::RwLock; use crate::{ core::{event_engine::Effect, PubNubError, RequestRetryConfiguration}, @@ -65,6 +66,9 @@ pub(crate) enum SubscribeEffect { /// Unique effect identifier. id: String, + /// Whether handshake effect has been cancelled or not. + cancelled: RwLock, + /// User input with channels and groups. /// /// Object contains list of channels and channel groups which will be @@ -93,6 +97,9 @@ pub(crate) enum SubscribeEffect { /// Unique effect identifier. id: String, + /// Whether handshake reconnect effect has been cancelled or not. + cancelled: RwLock, + /// User input with channels and groups. /// /// Object contains list of channels and channel groups which has been @@ -132,6 +139,9 @@ pub(crate) enum SubscribeEffect { /// Unique effect identifier. id: String, + /// Whether receive effect has been cancelled or not. + cancelled: RwLock, + /// User input with channels and groups. /// /// Object contains list of channels and channel groups for which @@ -160,6 +170,9 @@ pub(crate) enum SubscribeEffect { /// Unique effect identifier. id: String, + /// Whether receive reconnect effect has been cancelled or not. + cancelled: RwLock, + /// User input with channels and groups. /// /// Object contains list of channels and channel groups which has been @@ -385,24 +398,32 @@ impl Effect for SubscribeEffect { match self { Self::Handshake { id, + cancelled, cancellation_channel, .. } | Self::HandshakeReconnect { id, + cancelled, cancellation_channel, .. } | Self::Receive { id, + cancelled, cancellation_channel, .. } | Self::ReceiveReconnect { id, + cancelled, cancellation_channel, .. } => { + { + let mut cancelled_slot = cancelled.write(); + *cancelled_slot = true; + } cancellation_channel .send_blocking(id.clone()) .expect("cancellation pipe is broken!"); @@ -410,6 +431,16 @@ impl Effect for SubscribeEffect { _ => { /* cannot cancel other effects */ } } } + + fn is_cancelled(&self) -> bool { + match self { + Self::Handshake { cancelled, .. } + | Self::HandshakeReconnect { cancelled, .. } + | Self::Receive { cancelled, .. } + | Self::ReceiveReconnect { cancelled, .. } => *cancelled.read(), + _ => false, + } + } } #[cfg(test)] @@ -424,6 +455,7 @@ mod should { let effect = SubscribeEffect::Handshake { id: Uuid::new_v4().to_string(), + cancelled: RwLock::new(false), input: SubscriptionInput::new(&None, &None), cursor: None, executor: Arc::new(|_| { diff --git a/src/dx/subscribe/event_engine/effects/receive.rs b/src/dx/subscribe/event_engine/effects/receive.rs index c3dc2402..fdd4cd42 100644 --- a/src/dx/subscribe/event_engine/effects/receive.rs +++ b/src/dx/subscribe/event_engine/effects/receive.rs @@ -128,4 +128,23 @@ mod should { SubscribeEvent::ReceiveFailure { .. } )); } + + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_receive_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &Default::default(), + "id", + &mock_receive_function, + ) + .await; + + assert!(result.is_empty()); + } } diff --git a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs index ca7efa7d..5e0d8e36 100644 --- a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs +++ b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs @@ -23,7 +23,9 @@ pub(crate) async fn execute( retry_policy: &RequestRetryConfiguration, executor: &Arc, ) -> Vec { - if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) { + if !matches!(reason, PubNubError::EffectCanceled) + && !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) + { return vec![SubscribeEvent::ReceiveReconnectGiveUp { reason }]; } @@ -225,6 +227,38 @@ mod should { )); } + #[tokio::test] + async fn return_empty_event_on_effect_cancel_err() { + let mock_receive_function: Arc = + Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed()); + + let result = execute( + &SubscriptionInput::new( + &Some(vec!["ch1".to_string()]), + &Some(vec!["cg1".to_string()]), + ), + &Default::default(), + 10, + PubNubError::Transport { + details: "test".into(), + response: Some(Box::new(TransportResponse { + status: 500, + ..Default::default() + })), + }, + "id", + &RequestRetryConfiguration::Linear { + max_retry: 20, + delay: 0, + excluded_endpoints: None, + }, + &mock_receive_function, + ) + .await; + + assert!(result.is_empty()); + } + #[tokio::test] async fn return_receive_reconnect_give_up_event_on_err_with_none_auto_retry_policy() { let mock_receive_function: Arc = Arc::new(move |_| { diff --git a/src/dx/subscribe/event_engine/state.rs b/src/dx/subscribe/event_engine/state.rs index 6174e196..f5debe80 100644 --- a/src/dx/subscribe/event_engine/state.rs +++ b/src/dx/subscribe/event_engine/state.rs @@ -231,7 +231,10 @@ impl SubscribeState { input: SubscriptionInput::new(channels, channel_groups), cursor: cursor.clone(), }), - None, + Some(vec![EmitStatus(ConnectionStatus::SubscriptionChanged { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + })]), )) } Self::ReceiveFailed { cursor, .. } => Some(self.transition_to( @@ -290,7 +293,10 @@ impl SubscribeState { input: SubscriptionInput::new(channels, channel_groups), cursor: restore_cursor.clone(), }), - None, + Some(vec![EmitStatus(ConnectionStatus::SubscriptionChanged { + channels: channels.clone(), + channel_groups: channel_groups.clone(), + })]), )), Self::ReceiveFailed { .. } => Some(self.transition_to( Some(Self::Handshaking { diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs index e01a8c33..27c4c8e0 100644 --- a/src/dx/subscribe/mod.rs +++ b/src/dx/subscribe/mod.rs @@ -148,7 +148,7 @@ where /// /// # #[tokio::main] /// # async fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -160,8 +160,8 @@ where /// // ... /// // We need to pass client into other component which would like to /// // have own listeners to handle real-time events. - /// let empty_client = client.clone_empty(); - /// // self.other_component(empty_client); + /// let empty_pubnub_client = pubnub.clone_empty(); + /// // self.other_component(empty_pubnub_client); /// # Ok(()) /// # } /// ``` @@ -199,12 +199,16 @@ where /// /// ```rust,no_run /// use futures::StreamExt; - /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset, subscribe::EventEmitter}; + /// use pubnub::{ + /// subscribe::{ + /// EventEmitter, {EventSubscriber, SubscriptionParams}, + /// }, + /// Keyset, PubNubClient, PubNubClientBuilder, + /// }; /// /// # #[tokio::main] /// # async fn main() -> Result<(), pubnub::core::PubNubError> { - /// use pubnub::subscribe::{EventSubscriber, SubscriptionParams}; - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -213,7 +217,7 @@ where /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let subscription = client.subscription(SubscriptionParams { + /// let subscription = pubnub.subscription(SubscriptionParams { /// channels: Some(&["my_channel_1", "my_channel_2", "my_channel_3"]), /// channel_groups: None, /// options: None @@ -257,14 +261,16 @@ where /// /// ```no_run /// use futures::StreamExt; - /// use pubnub::dx::subscribe::{EventEmitter, SubscribeStreamEvent, Update}; + /// use pubnub::{ + /// subscribe::{ + /// EventEmitter, {EventSubscriber, SubscriptionParams, Update}, + /// }, + /// Keyset, PubNubClient, PubNubClientBuilder, + /// }; /// /// # #[tokio::main] /// # async fn main() -> Result<(), Box> { - /// # use pubnub::{Keyset, PubNubClientBuilder}; - /// use pubnub::subscribe::SubscriptionParams; - /// # - /// # let client = PubNubClientBuilder::with_reqwest_transport() + /// # let pubnub = PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", /// # publish_key: Some("demo"), @@ -272,14 +278,14 @@ where /// # }) /// # .with_user_id("user_id") /// # .build()?; - /// # let subscription = client.subscription(SubscriptionParams { + /// # let subscription = pubnub.subscription(SubscriptionParams { /// # channels: Some(&["channel"]), /// # channel_groups: None, /// # options: None /// # }); /// # let stream = // DataStream /// # subscription.messages_stream(); - /// client.disconnect(); + /// pubnub.disconnect(); /// # Ok(()) /// # } /// ``` @@ -330,14 +336,16 @@ where /// /// ```no_run /// use futures::StreamExt; - /// use pubnub::dx::subscribe::{EventEmitter, SubscribeStreamEvent, Update}; + /// use pubnub::{ + /// subscribe::{ + /// EventEmitter, {EventSubscriber, SubscriptionParams, Update}, + /// }, + /// Keyset, PubNubClient, PubNubClientBuilder, + /// }; /// /// # #[tokio::main] /// # async fn main() -> Result<(), Box> { - /// # use pubnub::{Keyset, PubNubClientBuilder}; - /// use pubnub::subscribe::SubscriptionParams; - /// # - /// # let client = PubNubClientBuilder::with_reqwest_transport() + /// # let pubnub = PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", /// # publish_key: Some("demo"), @@ -345,7 +353,7 @@ where /// # }) /// # .with_user_id("user_id") /// # .build()?; - /// # let subscription = client.subscription(SubscriptionParams { + /// # let subscription = pubnub.subscription(SubscriptionParams { /// # channels: Some(&["channel"]), /// # channel_groups: None, /// # options: None @@ -353,8 +361,8 @@ where /// # let stream = // DataStream /// # subscription.messages_stream(); /// # // ..... - /// # client.disconnect(); - /// client.reconnect(None); + /// # pubnub.disconnect(); + /// pubnub.reconnect(None); /// # Ok(()) /// # } /// ``` @@ -363,6 +371,7 @@ where pub fn reconnect(&self, cursor: Option) { #[cfg(feature = "presence")] let mut input: Option = None; + let cursor = cursor.or_else(|| self.cursor.read().clone()); if let Some(manager) = self.subscription_manager(false).read().as_ref() { #[cfg(feature = "presence")] @@ -403,6 +412,9 @@ where /// created [`Subscription`] and [`SubscriptionSet`]. pub fn unsubscribe_all(&self) { { + let mut cursor = self.cursor.write(); + *cursor = None; + if let Some(manager) = self.subscription_manager(false).write().as_mut() { manager.unregister_all() } @@ -624,13 +636,16 @@ impl PubNubClientInstance { /// /// ```no_run // Starts listening for real-time updates /// use futures::StreamExt; - /// use pubnub::dx::subscribe::{SubscribeStreamEvent, Update}; + /// use pubnub::{ + /// subscribe::{ + /// EventEmitter, {EventSubscriber, SubscriptionParams, Update}, + /// }, + /// Keyset, PubNubClient, PubNubClientBuilder, + /// }; /// /// # #[tokio::main] /// # async fn main() -> Result<(), Box> { - /// # use pubnub::{Keyset, PubNubClientBuilder}; - /// # - /// # let client = PubNubClientBuilder::with_reqwest_transport() + /// # let pubnub = PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", /// # publish_key: Some("demo"), @@ -638,7 +653,7 @@ impl PubNubClientInstance { /// # }) /// # .with_user_id("user_id") /// # .build()?; - /// client + /// pubnub /// .subscribe_raw() /// .channels(["hello".into(), "world".into()].to_vec()) /// .execute()? @@ -900,7 +915,7 @@ mod should { channel_groups: Some(&["group_a"]), options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]), }); - subscription.subscribe(None); + subscription.subscribe(); let status = client.status_stream().next().await.unwrap(); let _ = subscription.messages_stream().next().await.unwrap(); diff --git a/src/dx/subscribe/subscription.rs b/src/dx/subscribe/subscription.rs index 3196b248..9bca92ff 100644 --- a/src/dx/subscribe/subscription.rs +++ b/src/dx/subscribe/subscription.rs @@ -44,7 +44,7 @@ use crate::{ /// }; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { -/// let client = // PubNubClient +/// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -53,7 +53,7 @@ use crate::{ /// # }) /// # .with_user_id("uuid") /// # .build()?; -/// let channel = client.channel("my_channel"); +/// let channel = pubnub.channel("my_channel"); /// // Creating Subscription instance for the Channel entity to subscribe and listen /// // for real-time events. /// let subscription = channel.subscription(None); @@ -72,7 +72,7 @@ use crate::{ /// }; /// /// # fn main() -> Result<(), pubnub::core::PubNubError> { -/// let client = // PubNubClient +/// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -81,7 +81,7 @@ use crate::{ /// # }) /// # .with_user_id("uuid") /// # .build()?; -/// let channels = client.channels(&["my_channel_1", "my_channel_2"]); +/// let channels = pubnub.channels(&["my_channel_1", "my_channel_2"]); /// // Two `Subscription` instances can be added to create `SubscriptionSet` which can be used /// // to attach listeners and subscribe in one place for both subscriptions used in addition /// // operation. @@ -206,7 +206,7 @@ where /// /// # #[tokio::main] /// # async fn main() -> Result<(), pubnub::core::PubNubError> { - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -215,7 +215,7 @@ where /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let channel = client.channel("my_channel"); + /// let channel = pubnub.channel("my_channel"); /// // Creating Subscription instance for the Channel entity to subscribe and listen /// // for real-time events. /// let subscription = channel.subscription(None); @@ -425,6 +425,29 @@ where *self.is_subscribed.read() } + /// Register `Subscription` within `SubscriptionManager`. + /// + /// # Arguments + /// + /// - `cursor` - Subscription real-time events catch up cursor. + fn register_with_cursor(&self, cursor: Option) { + let Some(client) = self.client.upgrade().clone() else { + return; + }; + + { + if let Some(manager) = client.subscription_manager(true).write().as_mut() { + // Mark entities as "in use" by subscription. + self.entity.increase_subscriptions_count(); + + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.register(&handler, cursor); + } + } + } + } + /// Filters the given list of `Update` events based on the subscription /// input and the current timetoken. /// @@ -482,36 +505,44 @@ where T: Transport + Send + Sync + 'static, D: Deserializer + Send + Sync + 'static, { - fn subscribe(&self, cursor: Option) { + fn subscribe(&self) { let mut is_subscribed = self.is_subscribed.write(); if *is_subscribed { return; } *is_subscribed = true; - if cursor.is_some() { - let mut cursor_slot = self.cursor.write(); - if let Some(current_cursor) = cursor_slot.as_ref() { - let catchup_cursor = cursor.clone().unwrap_or_default(); - catchup_cursor - .gt(current_cursor) - .then(|| *cursor_slot = Some(catchup_cursor)); - } else { - *cursor_slot = cursor.clone(); - } + self.register_with_cursor(self.cursor.read().clone()); + } + + fn subscribe_with_timetoken(&self, cursor: SC) + where + SC: Into, + { + let mut is_subscribed = self.is_subscribed.write(); + if *is_subscribed { + return; } + *is_subscribed = true; - if let Some(client) = self.client().upgrade().clone() { - if let Some(manager) = client.subscription_manager(true).write().as_mut() { - // Mark entities as "in use" by subscription. - self.entity.increase_subscriptions_count(); + let user_cursor = cursor.into(); + let cursor = user_cursor.is_valid().then_some(user_cursor); - if let Some((_, handler)) = self.clones.read().iter().next() { - let handler: Weak + Send + Sync> = handler.clone(); - manager.register(&handler, cursor); + { + if cursor.is_some() { + let mut cursor_slot = self.cursor.write(); + if let Some(current_cursor) = cursor_slot.as_ref() { + let catchup_cursor = cursor.clone().unwrap_or_default(); + catchup_cursor + .gt(current_cursor) + .then(|| *cursor_slot = Some(catchup_cursor)); + } else { + *cursor_slot = cursor.clone(); } } } + + self.register_with_cursor(cursor); } fn unsubscribe(&self) { @@ -521,6 +552,9 @@ where return; } *is_subscribed_slot = false; + + let mut cursor = self.cursor.write(); + *cursor = None; } if let Some(client) = self.client().upgrade().clone() { diff --git a/src/dx/subscribe/subscription_set.rs b/src/dx/subscribe/subscription_set.rs index 3e6d58ce..a967aceb 100644 --- a/src/dx/subscribe/subscription_set.rs +++ b/src/dx/subscribe/subscription_set.rs @@ -39,12 +39,11 @@ use crate::{ /// ### Multiplexed subscription /// /// ```rust -/// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; +/// use pubnub::{subscribe::SubscriptionParams, Keyset, PubNubClient, PubNubClientBuilder}; /// /// # #[tokio::main] /// # async fn main() -> Result<(), pubnub::core::PubNubError> { -/// use pubnub::subscribe::SubscriptionParams; -/// let client = // PubNubClient +/// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -53,7 +52,7 @@ use crate::{ /// # }) /// # .with_user_id("uuid") /// # .build()?; -/// let subscription = client.subscription(SubscriptionParams { +/// let subscription = pubnub.subscription(SubscriptionParams { /// channels: Some(&["my_channel_1", "my_channel_2"]), /// channel_groups:Some(&["my_group"]), /// options:None @@ -72,7 +71,7 @@ use crate::{ /// /// # #[tokio::main] /// # async fn main() -> Result<(), pubnub::core::PubNubError> { -/// let client = // PubNubClient +/// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -81,7 +80,7 @@ use crate::{ /// # }) /// # .with_user_id("uuid") /// # .build()?; -/// let channels = client.channels(&["my_channel_1", "my_channel_2"]); +/// let channels = pubnub.channels(&["my_channel_1", "my_channel_2"]); /// // Two `Subscription` instances can be added to create `SubscriptionSet` which can be used /// // to attach listeners and subscribe in one place for both subscriptions used in addition /// // operation. @@ -231,12 +230,11 @@ where /// # Example /// /// ```rust - /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset}; + /// use pubnub::{subscribe::SubscriptionParams, Keyset, PubNubClient, PubNubClientBuilder}; /// /// # #[tokio::main] /// # async fn main() -> Result<(), pubnub::core::PubNubError> { - /// use pubnub::subscribe::SubscriptionParams; - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -245,7 +243,7 @@ where /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let subscription = client.subscription(SubscriptionParams { + /// let subscription = pubnub.subscription(SubscriptionParams { /// channels: Some(&["my_channel_1", "my_channel_2"]), /// channel_groups: Some(&["my_group"]), /// options: None @@ -270,6 +268,171 @@ where } } + /// Adds a list of subscriptions to the subscription set. + /// + /// # Arguments + /// + /// * `subscriptions` - A vector of `Subscription` objects to be added to + /// the subscription set. + /// + /// # Example + /// + /// ```rust + /// use pubnub::{ + /// subscribe::{Subscriber, SubscriptionParams}, + /// Keyset, PubNubClient, PubNubClientBuilder, + /// }; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<(), pubnub::core::PubNubError> { + /// let pubnub = // PubNubClient + /// # PubNubClientBuilder::with_reqwest_transport() + /// # .with_keyset(Keyset { + /// # subscribe_key: "demo", + /// # publish_key: Some("demo"), + /// # secret_key: Some("demo") + /// # }) + /// # .with_user_id("uuid") + /// # .build()?; + /// // Create subscription set for list of channels and groups. + /// let mut subscription = pubnub.subscription(SubscriptionParams { + /// channels: Some(&["my_channel_1", "my_channel_2"]), + /// channel_groups: Some(&["my_group"]), + /// options: None + /// }); + /// let channel = pubnub.channel("my_channel_3"); + /// // Creating Subscription instance for the Channel entity to subscribe and listen + /// // for real-time events. + /// let channel_subscription = channel.subscription(None); + /// // It is possible to separately add listeners to the `channel_subscription` + /// // and call `subscribe()` or `subscribe_with_timetoken(..)`. + /// + /// // Add channel subscription to the set. + /// subscription.add_subscriptions(vec![channel_subscription]); + /// # Ok(()) + /// # } + /// ``` + pub fn add_subscriptions(&mut self, subscriptions: Vec>) { + let unique_subscriptions = + { Self::unique_subscriptions_from_list(Some(self), subscriptions) }; + + { + let mut subscription_input = self.subscription_input.write(); + *subscription_input += Self::subscription_input_from_list(&unique_subscriptions, true); + self.subscriptions + .write() + .extend(unique_subscriptions.clone()); + } + + // Check whether subscription change required or not. + if !self.is_subscribed() || unique_subscriptions.is_empty() { + return; + } + + let Some(client) = self.client().upgrade().clone() else { + return; + }; + + if let Some(manager) = client.subscription_manager(true).write().as_mut() { + // Mark entities as "in-use" by subscription. + unique_subscriptions.iter().for_each(|subscription| { + subscription.entity.increase_subscriptions_count(); + }); + + // Notify manager to update its state with new subscriptions. + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.update(&handler, None); + } + }; + } + + /// Subtracts the given subscriptions from the subscription set. + /// + /// # Arguments + /// + /// * `subscriptions` - A vector of `Subscription` objects to be removed + /// from the subscription set. + /// + /// # Example + /// + /// ```rust + /// use pubnub::{ + /// subscribe::{Subscriber, SubscriptionParams}, + /// Keyset, PubNubClient, PubNubClientBuilder, + /// }; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<(), pubnub::core::PubNubError> { + /// let pubnub = // PubNubClient + /// # PubNubClientBuilder::with_reqwest_transport() + /// # .with_keyset(Keyset { + /// # subscribe_key: "demo", + /// # publish_key: Some("demo"), + /// # secret_key: Some("demo") + /// # }) + /// # .with_user_id("uuid") + /// # .build()?; + /// // Create subscription set for list of channels and groups. + /// let mut subscription = pubnub.subscription(SubscriptionParams { + /// channels: Some(&["my_channel_1", "my_channel_2"]), + /// channel_groups: Some(&["my_group"]), + /// options: None + /// }); + /// let channel = pubnub.channel("my_channel_3"); + /// // Creating Subscription instance for the Channel entity to subscribe and listen + /// // for real-time events. + /// let channel_subscription = channel.subscription(None); + /// // It is possible to separately add listeners to the `channel_subscription` + /// // and call `subscribe()` or `subscribe_with_timetoken(..)`. + /// + /// // Add channel subscription to the set. + /// subscription.add_subscriptions(vec![channel_subscription.clone()]); + /// + /// // After some time it needed to remove subscriptions from the set. + /// subscription.sub_subscriptions(vec![channel_subscription]); + /// # Ok(()) + /// # } + /// ``` + pub fn sub_subscriptions(&mut self, subscriptions: Vec>) { + let removed: Vec> = { + let subscriptions_slot = self.subscriptions.read(); + Self::unique_subscriptions_from_list(None, subscriptions) + .into_iter() + .filter(|subscription| subscriptions_slot.contains(subscription)) + .collect() + }; + + { + let mut subscription_input = self.subscription_input.write(); + *subscription_input -= Self::subscription_input_from_list(&removed, true); + let mut subscription_slot = self.subscriptions.write(); + subscription_slot.retain(|subscription| !removed.contains(subscription)); + } + + // Check whether subscription change required or not. + if !self.is_subscribed() || removed.is_empty() { + return; + } + + let Some(client) = self.client().upgrade().clone() else { + return; + }; + + // Mark entities as "not in-use" by subscription. + removed.iter().for_each(|subscription| { + subscription.entity.decrease_subscriptions_count(); + }); + + if let Some(manager) = client.subscription_manager(true).write().as_mut() { + // Notify manager to update its state with removed subscriptions. + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.update(&handler, Some(&removed)); + } + }; + } + /// Aggregate subscriptions' input. /// /// # Arguments @@ -447,40 +610,7 @@ where D: Deserializer + Send + Sync, { fn add_assign(&mut self, rhs: Self) { - let unique_subscriptions = { - let other_subscriptions = rhs.subscriptions.read(); - SubscriptionSet::unique_subscriptions_from_list(Some(self), other_subscriptions.clone()) - }; - - { - let mut subscription_input = self.subscription_input.write(); - *subscription_input += Self::subscription_input_from_list(&unique_subscriptions, true); - self.subscriptions - .write() - .extend(unique_subscriptions.clone()); - } - - // Check whether subscription change required or not. - if !self.is_subscribed() || unique_subscriptions.is_empty() { - return; - } - - let Some(client) = self.client().upgrade().clone() else { - return; - }; - - if let Some(manager) = client.subscription_manager(true).write().as_mut() { - // Mark entities as "in-use" by subscription. - unique_subscriptions.iter().for_each(|subscription| { - subscription.entity.increase_subscriptions_count(); - }); - - // Notify manager to update its state with new subscriptions. - if let Some((_, handler)) = self.clones.read().iter().next() { - let handler: Weak + Send + Sync> = handler.clone(); - manager.update(&handler, None); - } - }; + self.add_subscriptions(rhs.subscriptions.read().clone()); } } impl Sub for SubscriptionSet @@ -511,43 +641,7 @@ where D: Deserializer + Send + Sync, { fn sub_assign(&mut self, rhs: Self) { - let removed: Vec> = { - let other_subscriptions = rhs.subscriptions.read(); - let subscriptions_slot = self.subscriptions.read(); - Self::unique_subscriptions_from_list(None, other_subscriptions.clone()) - .into_iter() - .filter(|subscription| subscriptions_slot.contains(subscription)) - .collect() - }; - - { - let mut subscription_input = self.subscription_input.write(); - *subscription_input -= Self::subscription_input_from_list(&removed, true); - let mut subscription_slot = self.subscriptions.write(); - subscription_slot.retain(|subscription| !removed.contains(subscription)); - } - - // Check whether subscription change required or not. - if !self.is_subscribed() || removed.is_empty() { - return; - } - - let Some(client) = self.client().upgrade().clone() else { - return; - }; - - // Mark entities as "not in-use" by subscription. - removed.iter().for_each(|subscription| { - subscription.entity.decrease_subscriptions_count(); - }); - - if let Some(manager) = client.subscription_manager(true).write().as_mut() { - // Notify manager to update its state with removed subscriptions. - if let Some((_, handler)) = self.clones.read().iter().next() { - let handler: Weak + Send + Sync> = handler.clone(); - manager.update(&handler, Some(&removed)); - } - }; + self.sub_subscriptions(rhs.subscriptions.read().clone()); } } @@ -633,7 +727,7 @@ where /// # #[tokio::main] /// # async fn main() -> Result<(), pubnub::core::PubNubError> { /// use pubnub::subscribe::SubscriptionParams; - /// let client = // PubNubClient + /// let pubnub = // PubNubClient /// # PubNubClientBuilder::with_reqwest_transport() /// # .with_keyset(Keyset { /// # subscribe_key: "demo", @@ -642,7 +736,7 @@ where /// # }) /// # .with_user_id("uuid") /// # .build()?; - /// let subscription = client.subscription(SubscriptionParams { + /// let subscription = pubnub.subscription(SubscriptionParams { /// channels: Some(&["my_channel_1", "my_channel_2"]), /// channel_groups: Some(&["my_group"]), /// options: None @@ -694,6 +788,32 @@ where *self.is_subscribed.read() } + /// Register `Subscription` within `SubscriptionManager`. + /// + /// # Arguments + /// + /// - `cursor` - Subscription real-time events catch up cursor. + fn register_with_cursor(&self, cursor: Option) { + let Some(client) = self.client().upgrade().clone() else { + return; + }; + + { + let manager = client.subscription_manager(true); + if let Some(manager) = manager.write().as_mut() { + // Mark entities as "in-use" by subscription. + self.subscriptions.read().iter().for_each(|subscription| { + subscription.entity.increase_subscriptions_count(); + }); + + if let Some((_, handler)) = self.clones.read().iter().next() { + let handler: Weak + Send + Sync> = handler.clone(); + manager.register(&handler, cursor); + } + }; + } + } + /// Filters the given list of `Update` events based on the subscription /// input and the current timetoken. /// @@ -751,43 +871,44 @@ where T: Transport + Send + Sync + 'static, D: Deserializer + Send + Sync + 'static, { - fn subscribe(&self, cursor: Option) { + fn subscribe(&self) { let mut is_subscribed = self.is_subscribed.write(); if *is_subscribed { return; } *is_subscribed = true; - if cursor.is_some() { - let mut cursor_slot = self.cursor.write(); - if let Some(current_cursor) = cursor_slot.as_ref() { - let catchup_cursor = cursor.clone().unwrap_or_default(); - catchup_cursor - .gt(current_cursor) - .then(|| *cursor_slot = Some(catchup_cursor)); - } else { - *cursor_slot = cursor.clone(); - } - } + self.register_with_cursor(self.cursor.read().clone()) + } - let Some(client) = self.client().upgrade().clone() else { + fn subscribe_with_timetoken(&self, cursor: SC) + where + SC: Into, + { + let mut is_subscribed = self.is_subscribed.write(); + if *is_subscribed { return; - }; + } + *is_subscribed = true; - { - let manager = client.subscription_manager(true); - if let Some(manager) = manager.write().as_mut() { - // Mark entities as "in-use" by subscription. - self.subscriptions.read().iter().for_each(|subscription| { - subscription.entity.increase_subscriptions_count(); - }); + let user_cursor = cursor.into(); + let cursor = user_cursor.is_valid().then_some(user_cursor); - if let Some((_, handler)) = self.clones.read().iter().next() { - let handler: Weak + Send + Sync> = handler.clone(); - manager.register(&handler, cursor); + { + if cursor.is_some() { + let mut cursor_slot = self.cursor.write(); + if let Some(current_cursor) = cursor_slot.as_ref() { + let catchup_cursor = cursor.clone().unwrap_or_default(); + catchup_cursor + .gt(current_cursor) + .then(|| *cursor_slot = Some(catchup_cursor)); + } else { + *cursor_slot = cursor.clone(); } - }; + } } + + self.register_with_cursor(cursor); } fn unsubscribe(&self) { @@ -797,6 +918,9 @@ where return; } *is_subscribed_slot = false; + + let mut cursor = self.cursor.write(); + *cursor = None; } let Some(client) = self.client().upgrade().clone() else { diff --git a/src/dx/subscribe/traits/event_subscribe.rs b/src/dx/subscribe/traits/event_subscribe.rs index 9778fee4..2f01b7a1 100644 --- a/src/dx/subscribe/traits/event_subscribe.rs +++ b/src/dx/subscribe/traits/event_subscribe.rs @@ -11,8 +11,21 @@ use crate::subscribe::SubscriptionCursor; /// Types that implement this trait can change activity of real-time events /// processing for specific or set of entities. pub trait EventSubscriber { - /// Use receiver to subscribe for real-time updates. - fn subscribe(&self, cursor: Option); + /// Use the receiver to subscribe for real-time updates. + fn subscribe(&self); + + /// Use the receiver to subscribe for real-time updates starting at a + /// specific time. + /// + /// # Arguments + /// + /// - `cursor` - `SubscriptionCursor` from which, the client should try to + /// catch up on real-time events. `cursor` also can be provided as `usize` + /// or `String` with a 17-digit PubNub timetoken. If `cursor` doesn't + /// satisfy the requirements, it will be ignored. + fn subscribe_with_timetoken(&self, cursor: SC) + where + SC: Into; /// Use receiver to stop receiving real-time updates. fn unsubscribe(&self); diff --git a/src/dx/subscribe/types.rs b/src/dx/subscribe/types.rs index 23cee281..a44a8a6a 100644 --- a/src/dx/subscribe/types.rs +++ b/src/dx/subscribe/types.rs @@ -90,7 +90,7 @@ pub enum SubscriptionOptions { /// Whether presence events should be received. /// /// Whether presence updates for `userId` should be delivered through - /// [`Subscription2`] listener streams or not. + /// [`Subscription`] and [`SubscriptionSet`] listener streams or not. ReceivePresenceEvents, } @@ -128,52 +128,6 @@ pub struct SubscriptionCursor { pub region: u32, } -impl PartialOrd for SubscriptionCursor { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } - - fn lt(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs < rhs - } - - fn le(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs <= rhs - } - - fn gt(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs > rhs - } - - fn ge(&self, other: &Self) -> bool { - let lhs = self.timetoken.parse::().expect("Invalid timetoken"); - let rhs = other.timetoken.parse::().expect("Invalid timetoken"); - lhs >= rhs - } -} - -impl Ord for SubscriptionCursor { - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap_or(Ordering::Equal) - } -} - -impl Debug for SubscriptionCursor { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - write!( - f, - "SubscriptionCursor {{ timetoken: {}, region: {} }}", - self.timetoken, self.region - ) - } -} - /// Subscription statuses. #[derive(Clone, PartialEq)] pub enum ConnectionStatus { @@ -192,6 +146,20 @@ pub enum ConnectionStatus { /// Unexpected disconnection. DisconnectedUnexpectedly(PubNubError), + + /// List of channels and groups changed in subscription. + SubscriptionChanged { + /// List of channels used in subscription. + /// + /// Channels can be: + /// - regular channels + /// - channel metadata `id`s + /// - user metadata `id`s + channels: Option>, + + /// List of channel groups used in subscription. + channel_groups: Option>, + }, } /// Presence update information. @@ -602,6 +570,21 @@ pub enum MessageActionEvent { Delete, } +impl SubscriptionCursor { + /// Checks if the `timetoken` is valid. + /// + /// A valid `timetoken` should have a length of 17 and contain only numeric + /// characters. + /// + /// # Returns + /// + /// Returns `true` if the `timetoken` is valid, otherwise `false`. + #[cfg(feature = "std")] + pub(crate) fn is_valid(&self) -> bool { + self.timetoken.len() == 17 && self.timetoken.chars().all(char::is_numeric) + } +} + impl Default for SubscriptionCursor { fn default() -> Self { Self { @@ -611,10 +594,103 @@ impl Default for SubscriptionCursor { } } +impl PartialOrd for SubscriptionCursor { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + + fn lt(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs < rhs + } + + fn le(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs <= rhs + } + + fn gt(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs > rhs + } + + fn ge(&self, other: &Self) -> bool { + let lhs = self.timetoken.parse::().expect("Invalid timetoken"); + let rhs = other.timetoken.parse::().expect("Invalid timetoken"); + lhs >= rhs + } +} + +impl Ord for SubscriptionCursor { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap_or(Ordering::Equal) + } +} + +impl Debug for SubscriptionCursor { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + write!( + f, + "SubscriptionCursor {{ timetoken: {}, region: {} }}", + self.timetoken, self.region + ) + } +} + impl From for SubscriptionCursor { fn from(value: String) -> Self { - Self { - timetoken: value, + let mut timetoken = value; + if timetoken.len() != 17 || !timetoken.chars().all(char::is_numeric) { + timetoken = "-1".into(); + } + + SubscriptionCursor { + timetoken, + ..Default::default() + } + } +} + +impl From<&str> for SubscriptionCursor { + fn from(value: &str) -> Self { + let mut timetoken = value; + if timetoken.len() != 17 || !timetoken.chars().all(char::is_numeric) { + timetoken = "-1"; + } + + SubscriptionCursor { + timetoken: timetoken.to_string(), + ..Default::default() + } + } +} + +impl From for SubscriptionCursor { + fn from(value: usize) -> Self { + let mut timetoken = value.to_string(); + if timetoken.len() != 17 { + timetoken = "-1".into(); + } + + SubscriptionCursor { + timetoken, + ..Default::default() + } + } +} + +impl From for SubscriptionCursor { + fn from(value: u64) -> Self { + let mut timetoken = value.to_string(); + if timetoken.len() != 17 { + timetoken = "-1".into(); + } + + SubscriptionCursor { + timetoken, ..Default::default() } } @@ -671,6 +747,16 @@ impl Debug for ConnectionStatus { ConnectionStatus::DisconnectedUnexpectedly(err) => { write!(f, "DisconnectedUnexpectedly({err:?})") } + Self::SubscriptionChanged { + channels, + channel_groups, + } => { + write!( + f, + "SubscriptionChanged {{ channels: {channels:?}, \ + channel_groups: {channel_groups:?} }}" + ) + } } } } @@ -1111,4 +1197,88 @@ mod should { fn resolve_subscription_field_value(subscription: Option, channel: &str) -> String { resolve_subscription_value(subscription, channel) } + + #[test] + #[cfg(feature = "std")] + fn create_valid_subscription_cursor_as_struct() { + let cursor = SubscriptionCursor { + timetoken: "12345678901234567".into(), + region: 0, + }; + assert!(cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_valid_subscription_cursor_from_string() { + let cursor: SubscriptionCursor = "12345678901234567".to_string().into(); + assert!(cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_valid_subscription_cursor_from_string_slice() { + let cursor: SubscriptionCursor = "12345678901234567".into(); + assert!(cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_valid_subscription_cursor_from_usize() { + let timetoken: usize = 12345678901234567; + let cursor: SubscriptionCursor = timetoken.into(); + assert!(cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_valid_subscription_cursor_from_u64() { + let timetoken: u64 = 12345678901234567; + let cursor: SubscriptionCursor = timetoken.into(); + assert!(cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_invalid_subscription_cursor_from_short_string() { + let cursor: SubscriptionCursor = "1234567890123467".to_string().into(); + assert!(!cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_invalid_subscription_cursor_from_non_numeric_string() { + let cursor: SubscriptionCursor = "123456789a123467".to_string().into(); + assert!(!cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_invalid_subscription_cursor_from_short_string_slice() { + let cursor: SubscriptionCursor = "1234567890123567".into(); + assert!(!cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_invalid_subscription_cursor_from_non_numeric_string_slice() { + let cursor: SubscriptionCursor = "1234567890123a567".into(); + assert!(!cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_invalid_subscription_cursor_from_too_small_usize() { + let timetoken: usize = 123456789012567; + let cursor: SubscriptionCursor = timetoken.into(); + assert!(!cursor.is_valid()) + } + + #[test] + #[cfg(feature = "std")] + fn create_invalid_subscription_cursor_from_too_small_u64() { + let timetoken: u64 = 123901234567; + let cursor: SubscriptionCursor = timetoken.into(); + assert!(!cursor.is_valid()) + } } diff --git a/src/lib.rs b/src/lib.rs index 0e86640b..0b9a37f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,11 +39,11 @@ //! ```toml //! # default features //! [dependencies] -//! pubnub = "0.5.0" +//! pubnub = "0.6.0" //! //! # all features //! [dependencies] -//! pubnub = { version = "0.5.0", features = ["full"] } +//! pubnub = { version = "0.6.0", features = ["full"] } //! ``` //! //! ### Example @@ -51,17 +51,20 @@ //! Try the following sample code to get up and running quickly! //! //! ```no_run -//! use pubnub::{Keyset, PubNubClientBuilder}; -//! use pubnub::dx::subscribe::{SubscribeStreamEvent, Update}; +//! use pubnub::subscribe::Subscriber; //! use futures::StreamExt; //! use tokio::time::sleep; //! use std::time::Duration; //! use serde_json; -//! +//! use pubnub::{ +//! dx::subscribe::Update, +//! subscribe::EventSubscriber, +//! Keyset, PubNubClientBuilder, +//! }; //! #[tokio::main] //! async fn main() -> Result<(), Box> { //! use pubnub::subscribe::{EventEmitter, SubscriptionParams}; -//! let publish_key = "my_publish_key"; +//! let publish_key = "my_publish_key"; //! let subscribe_key = "my_subscribe_key"; //! let client = PubNubClientBuilder::with_reqwest_transport() //! .with_keyset(Keyset { @@ -71,15 +74,22 @@ //! }) //! .with_user_id("user_id") //! .build()?; +//! //! println!("PubNub instance created"); -//! +//! //! let subscription = client.subscription(SubscriptionParams { //! channels: Some(&["my_channel"]), //! channel_groups: None, //! options: None //! }); //! -//! println!("Subscribed to channel"); +//! let channel_entity = client.channel("my_channel_2"); +//! let channel_entity_subscription = channel_entity.subscription(None); +//! +//! subscription.subscribe(); +//! channel_entity_subscription.subscribe(); +//! +//! println!("Subscribed to channels"); //! //! // Launch a new task to print out each received message //! tokio::spawn(client.status_stream().for_each(|status| async move { @@ -110,7 +120,21 @@ //! } //! })); //! -//! sleep(Duration::from_secs(1)).await; +//! // Explicitly listen only for real-time `message` updates. +//! tokio::spawn( +//! channel_entity_subscription +//! .messages_stream() +//! .for_each(|message| async move { +//! if let Ok(utf8_message) = String::from_utf8(message.data.clone()) { +//! if let Ok(cleaned) = serde_json::from_str::(&utf8_message) { +//! println!("message: {}", cleaned); +//! } +//! } +//! }), +//! ); +//! +//! sleep(Duration::from_secs(2)).await; +//! //! // Send a message to the channel //! client //! .publish_message("hello world!") @@ -119,7 +143,15 @@ //! .execute() //! .await?; //! -//! sleep(Duration::from_secs(10)).await; +//! // Send a message to another channel +//! client +//! .publish_message("hello world on the other channel!") +//! .channel("my_channel_2") +//! .r#type("text-message") +//! .execute() +//! .await?; +//! +//! sleep(Duration::from_secs(15)).await; //! //! Ok(()) //! } @@ -135,11 +167,11 @@ //! ```toml //! # only blocking and access + default features //! [dependencies] -//! pubnub = { version = "0.5.0", features = ["blocking", "access"] } +//! pubnub = { version = "0.6.0", features = ["blocking", "access"] } //! //! # only parse_token + default features //! [dependencies] -//! pubnub = { version = "0.5.0", features = ["parse_token"] } +//! pubnub = { version = "0.6.0", features = ["parse_token"] } //! ``` //! //! ### Available features @@ -178,7 +210,7 @@ //! //! ```toml //! [dependencies] -//! pubnub = { version = "0.5.0", default-features = false, features = ["serde", "publish", +//! pubnub = { version = "0.6.0", default-features = false, features = ["serde", "publish", //! "blocking"] } //! ``` //! diff --git a/tests/presence/presence_steps.rs b/tests/presence/presence_steps.rs index 51a6fa41..ad8e7404 100644 --- a/tests/presence/presence_steps.rs +++ b/tests/presence/presence_steps.rs @@ -138,7 +138,7 @@ async fn join( subscriptions.values().cloned().collect(), options.clone(), ); - subscription.subscribe(None); + subscription.subscribe(); world.subscription = Some(subscription); world.subscriptions = Some(subscriptions); } diff --git a/tests/subscribe/subscribe_steps.rs b/tests/subscribe/subscribe_steps.rs index 529c39eb..b5d05aca 100644 --- a/tests/subscribe/subscribe_steps.rs +++ b/tests/subscribe/subscribe_steps.rs @@ -4,7 +4,7 @@ use cucumber::gherkin::Table; use cucumber::{codegen::Regex, gherkin::Step, then, when}; use futures::{select_biased, FutureExt, StreamExt}; use pubnub::core::RequestRetryConfiguration; -use pubnub::subscribe::{EventEmitter, EventSubscriber, SubscriptionParams}; +use pubnub::subscribe::{EventEmitter, EventSubscriber, SubscriptionCursor, SubscriptionParams}; use std::fs::read_to_string; /// Extract list of events and invocations from log. @@ -122,7 +122,7 @@ async fn subscribe(world: &mut PubNubWorld) { channel_groups: None, options: None, }); - subscription.subscribe(None); + subscription.subscribe(); world.subscription = Some(subscription); }); } @@ -139,7 +139,11 @@ async fn subscribe_with_timetoken(world: &mut PubNubWorld, timetoken: u64) { channel_groups: None, options: None, }); - subscription.subscribe(Some(timetoken.to_string().into())); + + subscription.subscribe_with_timetoken(SubscriptionCursor { + timetoken: timetoken.to_string(), + ..Default::default() + }); world.subscription = Some(subscription); }); }