diff --git a/.pubnub.yml b/.pubnub.yml
index da0930e9..9eb1e8da 100644
--- a/.pubnub.yml
+++ b/.pubnub.yml
@@ -1,9 +1,28 @@
 name: rust
-version: 0.5.0
+version: 0.6.0
 schema: 1
 scm: github.com/pubnub/rust
 files: []
 changelog:
+  - date: 2024-02-07
+    version: 0.6.0
+    changes:
+      - type: feature
+        text: "Make it possible to create `SubscriptionCursor` from the string slice."
+      - type: feature
+        text: "Add `add_subscriptions(..)` and `sub_subscriptions(..)` to `SubscriptionSet` to make it possible in addition to sets manipulation use list of subscriptions."
+      - type: bug
+        text: "Fix issue because of which `cursor` is not reset on `Subscription` and `SubscriptionSet` on unsubscribe."
+      - type: bug
+        text: "Fix issue because of which cancelled effects still asynchronously spawned for processing."
+      - type: improvement
+        text: "Change `client` to `pubnub` in inline docs."
+      - type: improvement
+        text: "Add subscription token validation."
+      - type: improvement
+        text: "Added a method to validate the provided subscription token to conform to PubNub time token requirements with precision."
+      - type: improvement
+        text: "Separate `subscribe` example into two to show separately `subscribe` feature and `presence state` maintenance with subscribe."
   - date: 2024-01-25
     version: 0.5.0
     changes:
diff --git a/Cargo.toml b/Cargo.toml
index afcc3265..7739d7ca 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "pubnub"
-version = "0.5.0"
+version = "0.6.0"
 edition = "2021"
 license-file = "LICENSE"
 authors = ["PubNub <support@pubnub.com>"]
@@ -165,6 +165,10 @@ required-features = ["default"]
 
 [[example]]
 name = "subscribe"
+required-features = ["default", "subscribe"]
+
+[[example]]
+name = "subscribe_with_presence_state"
 required-features = ["default", "subscribe", "presence"]
 
 [[example]]
diff --git a/README.md b/README.md
index 51ede15b..c125e259 100644
--- a/README.md
+++ b/README.md
@@ -36,11 +36,11 @@ Add `pubnub` to your Rust project in the `Cargo.toml` file:
 ```toml
 # default features
 [dependencies]
-pubnub = "0.5.0"
+pubnub = "0.6.0"
 
 # all features
 [dependencies]
-pubnub = { version = "0.5.0", features = ["full"] }
+pubnub = { version = "0.6.0", features = ["full"] }
 ```
 
 ### Example
@@ -48,17 +48,20 @@ pubnub = { version = "0.5.0", features = ["full"] }
 Try the following sample code to get up and running quickly!
 
 ```rust
-use pubnub::{Keyset, PubNubClientBuilder};
-use pubnub::dx::subscribe::{SubscribeStreamEvent, Update};
+use pubnub::subscribe::Subscriber;
 use futures::StreamExt;
 use tokio::time::sleep;
 use std::time::Duration;
 use serde_json;
-
+use pubnub::{
+    dx::subscribe::Update,
+    subscribe::EventSubscriber,
+    Keyset, PubNubClientBuilder,
+};
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     use pubnub::subscribe::{EventEmitter, SubscriptionParams};
-let publish_key = "my_publish_key";
+    let publish_key = "my_publish_key";
     let subscribe_key = "my_subscribe_key";
     let client = PubNubClientBuilder::with_reqwest_transport()
         .with_keyset(Keyset {
@@ -68,6 +71,7 @@ let publish_key = "my_publish_key";
         })
         .with_user_id("user_id")
         .build()?;
+
     println!("PubNub instance created");
 
     let subscription = client.subscription(SubscriptionParams {
@@ -76,7 +80,13 @@ let publish_key = "my_publish_key";
         options: None
     });
 
-    println!("Subscribed to channel");
+    let channel_entity = client.channel("my_channel_2");
+    let channel_entity_subscription = channel_entity.subscription(None);
+
+    subscription.subscribe();
+    channel_entity_subscription.subscribe();
+
+    println!("Subscribed to channels");
 
     // Launch a new task to print out each received message
     tokio::spawn(client.status_stream().for_each(|status| async move {
@@ -107,7 +117,21 @@ let publish_key = "my_publish_key";
         }
     }));
 
-    sleep(Duration::from_secs(1)).await;
+    // Explicitly listen only for real-time `message` updates.
+    tokio::spawn(
+        channel_entity_subscription
+            .messages_stream()
+            .for_each(|message| async move {
+                if let Ok(utf8_message) = String::from_utf8(message.data.clone()) {
+                    if let Ok(cleaned) = serde_json::from_str::<String>(&utf8_message) {
+                        println!("message: {}", cleaned);
+                    }
+                }
+            }),
+    );
+
+   sleep(Duration::from_secs(2)).await;
+
     // Send a message to the channel
     client
         .publish_message("hello world!")
@@ -116,7 +140,15 @@ let publish_key = "my_publish_key";
         .execute()
         .await?;
 
-    sleep(Duration::from_secs(10)).await;
+   // Send a message to another channel
+    client
+        .publish_message("hello world on the other channel!")
+        .channel("my_channel_2")
+        .r#type("text-message")
+        .execute()
+        .await?;
+
+    sleep(Duration::from_secs(15)).await;
 
     Ok(())
 }
@@ -132,11 +164,11 @@ disable them in the `Cargo.toml` file, like so:
 ```toml
 # only blocking and access + default features
 [dependencies]
-pubnub = { version = "0.5.0", features = ["blocking", "access"] }
+pubnub = { version = "0.6.0", features = ["blocking", "access"] }
 
 # only parse_token + default features
 [dependencies]
-pubnub = { version = "0.5.0", features = ["parse_token"] }
+pubnub = { version = "0.6.0", features = ["parse_token"] }
 ```
 
 ### Available features
@@ -175,7 +207,7 @@ you need, for example:
 
 ```toml
 [dependencies]
-pubnub = { version = "0.5.0", default-features = false, features = ["serde", "publish",
+pubnub = { version = "0.6.0", default-features = false, features = ["serde", "publish",
 "blocking"] }
 ```
 
diff --git a/examples/subscribe.rs b/examples/subscribe.rs
index fd9ab7c4..f89466f0 100644
--- a/examples/subscribe.rs
+++ b/examples/subscribe.rs
@@ -1,5 +1,3 @@
-use std::collections::HashMap;
-
 use futures::StreamExt;
 use serde::Deserialize;
 use std::env;
@@ -26,7 +24,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
     let publish_key = env::var("SDK_PUB_KEY")?;
     let subscribe_key = env::var("SDK_SUB_KEY")?;
 
-    let client = PubNubClientBuilder::with_reqwest_transport()
+    let pubnub = PubNubClientBuilder::with_reqwest_transport()
         .with_keyset(Keyset {
             subscribe_key,
             publish_key: Some(publish_key),
@@ -40,36 +38,25 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
 
     println!("running!");
 
-    client
-        .set_presence_state(HashMap::<String, String>::from([
-            (
-                "is_doing".to_string(),
-                "Nothing... Just hanging around...".to_string(),
-            ),
-            ("flag".to_string(), "false".to_string()),
-        ]))
-        .channels(["my_channel".into(), "other_channel".into()].to_vec())
-        .user_id("user_id")
-        .execute()
-        .await?;
-
     tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
 
-    let subscription = client.subscription(SubscriptionParams {
+    let subscription = pubnub.subscription(SubscriptionParams {
         channels: Some(&["my_channel", "other_channel"]),
         channel_groups: None,
         options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
     });
-    subscription.subscribe(None);
+    subscription.subscribe();
     let subscription_clone = subscription.clone_empty();
 
     // Attach connection status to the PubNub client instance.
     tokio::spawn(
-        client
+        pubnub
             .status_stream()
             .for_each(|status| async move { println!("\nstatus: {:?}", status) }),
     );
 
+    // Example of the "global" listener for multiplexed subscription object from
+    // PubNub client.
     tokio::spawn(subscription.stream().for_each(|event| async move {
         match event {
             Update::Message(message) | Update::Signal(message) => {
@@ -96,9 +83,11 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
         }
     }));
 
-    tokio::spawn(subscription_clone.stream().for_each(|event| async move {
-        match event {
-            Update::Message(message) | Update::Signal(message) => {
+    // Explicitly listen only for real-time `message` updates.
+    tokio::spawn(
+        subscription_clone
+            .messages_stream()
+            .for_each(|message| async move {
                 // Deserialize the message payload as you wish
                 match serde_json::from_slice::<Message>(&message.data) {
                     Ok(message) => println!("(b) defined message: {:?}", message),
@@ -106,25 +95,19 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
                         println!("(b) other message: {:?}", String::from_utf8(message.data))
                     }
                 }
-            }
-            Update::Presence(presence) => {
-                println!("(b) presence: {:?}", presence)
-            }
-            Update::AppContext(object) => {
-                println!("(b) object: {:?}", object)
-            }
-            Update::MessageAction(action) => {
-                println!("(b) message action: {:?}", action)
-            }
-            Update::File(file) => {
-                println!("(b) file: {:?}", file)
-            }
-        }
-    }));
+            }),
+    );
+
+    // Explicitly listen only for real-time `file` updates.
+    tokio::spawn(
+        subscription_clone
+            .files_stream()
+            .for_each(|file| async move { println!("(b) file: {:?}", file) }),
+    );
 
     // Sleep for a minute. Now you can send messages to the channels
     // "my_channel" and "other_channel" and see them printed in the console.
-    // You can use the publish example or [PubNub console](https://www.pubnub.com/docs/console/)
+    // You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/)
     // to send messages.
     tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
 
@@ -132,12 +115,12 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
     // subscription.unsubscribe();
 
     println!("\nDisconnect from the real-time data stream");
-    client.disconnect();
+    pubnub.disconnect();
 
     tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
 
     println!("\nReconnect to the real-time data stream");
-    client.reconnect(None);
+    pubnub.reconnect(None);
 
     // Let event engine process unsubscribe request
     tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
@@ -147,9 +130,11 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
     // drop(subscription_clone);
 
     println!(
-        "\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
+        "\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \
+        `subscription.subscribe_with_timetoken(Some(<timetoken>)) call."
+    );
     // Clean up before complete work with PubNub client instance.
-    client.unsubscribe_all();
+    pubnub.unsubscribe_all();
     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
 
     Ok(())
diff --git a/examples/subscribe_with_presence_state.rs b/examples/subscribe_with_presence_state.rs
new file mode 100644
index 00000000..423ef7ac
--- /dev/null
+++ b/examples/subscribe_with_presence_state.rs
@@ -0,0 +1,156 @@
+use std::collections::HashMap;
+
+use futures::StreamExt;
+use serde::Deserialize;
+use std::env;
+
+use pubnub::subscribe::{SubscriptionOptions, SubscriptionParams};
+use pubnub::{
+    dx::subscribe::Update,
+    subscribe::{EventEmitter, EventSubscriber},
+    Keyset, PubNubClientBuilder,
+};
+
+#[derive(Debug, Deserialize)]
+struct Message {
+    // Allowing dead code because we don't use these fields
+    // in this example.
+    #[allow(dead_code)]
+    url: String,
+    #[allow(dead_code)]
+    description: String,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn snafu::Error>> {
+    let publish_key = env::var("SDK_PUB_KEY")?;
+    let subscribe_key = env::var("SDK_SUB_KEY")?;
+
+    let pubnub = PubNubClientBuilder::with_reqwest_transport()
+        .with_keyset(Keyset {
+            subscribe_key,
+            publish_key: Some(publish_key),
+            secret_key: None,
+        })
+        .with_user_id("user_id")
+        .with_filter_expression("some_filter")
+        .with_heartbeat_value(100)
+        .with_heartbeat_interval(5)
+        .build()?;
+
+    println!("running!");
+
+    // Setting up state which will be associated with the user id as long as he is
+    // subscribed and not timeout.
+    pubnub
+        .set_presence_state(HashMap::<String, String>::from([
+            (
+                "is_doing".to_string(),
+                "Nothing... Just hanging around...".to_string(),
+            ),
+            ("flag".to_string(), "false".to_string()),
+        ]))
+        .channels(["my_channel".into(), "other_channel".into()].to_vec())
+        .user_id("user_id")
+        .execute()
+        .await?;
+
+    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
+
+    let subscription = pubnub.subscription(SubscriptionParams {
+        channels: Some(&["my_channel", "other_channel"]),
+        channel_groups: None,
+        options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
+    });
+    subscription.subscribe();
+    let subscription_clone = subscription.clone_empty();
+
+    // Attach connection status to the PubNub client instance.
+    tokio::spawn(
+        pubnub
+            .status_stream()
+            .for_each(|status| async move { println!("\nstatus: {:?}", status) }),
+    );
+
+    tokio::spawn(subscription.stream().for_each(|event| async move {
+        match event {
+            Update::Message(message) | Update::Signal(message) => {
+                // Deserialize the message payload as you wish
+                match serde_json::from_slice::<Message>(&message.data) {
+                    Ok(message) => println!("(a) defined message: {:?}", message),
+                    Err(_) => {
+                        println!("(a) other message: {:?}", String::from_utf8(message.data))
+                    }
+                }
+            }
+            Update::Presence(presence) => {
+                println!("(a) presence: {:?}", presence)
+            }
+            Update::AppContext(object) => {
+                println!("(a) object: {:?}", object)
+            }
+            Update::MessageAction(action) => {
+                println!("(a) message action: {:?}", action)
+            }
+            Update::File(file) => {
+                println!("(a) file: {:?}", file)
+            }
+        }
+    }));
+
+    // Explicitly listen only for real-time `message` updates.
+    tokio::spawn(
+        subscription_clone
+            .messages_stream()
+            .for_each(|message| async move {
+                // Deserialize the message payload as you wish
+                match serde_json::from_slice::<Message>(&message.data) {
+                    Ok(message) => println!("(b) defined message: {:?}", message),
+                    Err(_) => {
+                        println!("(b) other message: {:?}", String::from_utf8(message.data))
+                    }
+                }
+            }),
+    );
+
+    // Explicitly listen only for real-time `file` updates.
+    tokio::spawn(
+        subscription_clone
+            .files_stream()
+            .for_each(|file| async move { println!("(b) file: {:?}", file) }),
+    );
+
+    // Sleep for a minute. Now you can send messages to the channels
+    // "my_channel" and "other_channel" and see them printed in the console.
+    // You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/)
+    // to send messages.
+    tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
+
+    // You can also cancel the subscription at any time.
+    // subscription.unsubscribe();
+
+    println!("\nDisconnect from the real-time data stream");
+    pubnub.disconnect();
+
+    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
+
+    println!("\nReconnect to the real-time data stream");
+    pubnub.reconnect(None);
+
+    // Let event engine process unsubscribe request
+    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
+
+    // If Subscription or Subscription will go out of scope they will unsubscribe.
+    // drop(subscription);
+    // drop(subscription_clone);
+
+    println!(
+        "\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \
+        `subscription.subscribe_with_timetoken(Some(<timetoken>)) call."
+    );
+    // Clean up before complete work with PubNub client instance.
+    pubnub.unsubscribe_all();
+    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+    Ok(())
+}
diff --git a/src/core/event_engine/effect.rs b/src/core/event_engine/effect.rs
index 3e632977..4e6e5cce 100644
--- a/src/core/event_engine/effect.rs
+++ b/src/core/event_engine/effect.rs
@@ -18,4 +18,14 @@ pub(crate) trait Effect: Send + Sync {
 
     /// Cancel any ongoing effect's work.
     fn cancel(&self);
+
+    /// Check whether effect has been cancelled.
+    ///
+    /// Event engine dispatch effects asynchronously and there is a chance that
+    /// effect already has been cancelled.
+    ///
+    /// # Returns
+    ///
+    /// `true` if effect has been cancelled.   
+    fn is_cancelled(&self) -> bool;
 }
diff --git a/src/core/event_engine/effect_dispatcher.rs b/src/core/event_engine/effect_dispatcher.rs
index 153eb1f5..ea219e49 100644
--- a/src/core/event_engine/effect_dispatcher.rs
+++ b/src/core/event_engine/effect_dispatcher.rs
@@ -85,6 +85,11 @@ where
                             let cloned_self = cloned_self.clone();
 
                             runtime_clone.spawn(async move {
+                                // There is no need to spawn effect which already has been
+                                // cancelled.
+                                if effect.is_cancelled() {
+                                    return;
+                                }
                                 let events = effect.run().await;
 
                                 if invocation.is_managed() {
@@ -197,6 +202,10 @@ mod should {
         fn cancel(&self) {
             // Do nothing.
         }
+
+        fn is_cancelled(&self) -> bool {
+            false
+        }
     }
 
     enum TestInvocation {
diff --git a/src/core/event_engine/mod.rs b/src/core/event_engine/mod.rs
index 7cda7a88..dae326ec 100644
--- a/src/core/event_engine/mod.rs
+++ b/src/core/event_engine/mod.rs
@@ -314,6 +314,10 @@ mod should {
         fn cancel(&self) {
             // Do nothing.
         }
+
+        fn is_cancelled(&self) -> bool {
+            false
+        }
     }
 
     enum TestInvocation {
diff --git a/src/dx/presence/event_engine/effect_handler.rs b/src/dx/presence/event_engine/effect_handler.rs
index 633043c8..482a74a3 100644
--- a/src/dx/presence/event_engine/effect_handler.rs
+++ b/src/dx/presence/event_engine/effect_handler.rs
@@ -4,6 +4,7 @@
 //! event engine for
 
 use async_channel::Sender;
+use spin::RwLock;
 use uuid::Uuid;
 
 use crate::{
@@ -77,6 +78,7 @@ impl EffectHandler<PresenceEffectInvocation, PresenceEffect> for PresenceEffectH
                 reason,
             } => Some(PresenceEffect::DelayedHeartbeat {
                 id: Uuid::new_v4().to_string(),
+                cancelled: RwLock::new(false),
                 input: input.clone(),
                 attempts: *attempts,
                 reason: reason.clone(),
@@ -91,6 +93,7 @@ impl EffectHandler<PresenceEffectInvocation, PresenceEffect> for PresenceEffectH
             }),
             PresenceEffectInvocation::Wait { input } => Some(PresenceEffect::Wait {
                 id: Uuid::new_v4().to_string(),
+                cancelled: RwLock::new(false),
                 input: input.clone(),
                 executor: self.wait_call.clone(),
                 cancellation_channel: self.cancellation_channel.clone(),
diff --git a/src/dx/presence/event_engine/effects/heartbeat.rs b/src/dx/presence/event_engine/effects/heartbeat.rs
index e987b76d..ecd14cb2 100644
--- a/src/dx/presence/event_engine/effects/heartbeat.rs
+++ b/src/dx/presence/event_engine/effects/heartbeat.rs
@@ -46,7 +46,14 @@ pub(super) async fn execute(
         effect_id,
     })
     .map_ok_or_else(
-        |error| vec![PresenceEvent::HeartbeatFailure { reason: error }],
+        |error| {
+            log::error!("Handshake error: {:?}", error);
+
+            // Cancel is possible and no retries should be done.
+            (!matches!(error, PubNubError::EffectCanceled))
+                .then(|| vec![PresenceEvent::HeartbeatFailure { reason: error }])
+                .unwrap_or(vec![])
+        },
         |_| vec![PresenceEvent::HeartbeatSuccess],
     )
     .await
@@ -183,6 +190,37 @@ mod it_should {
         ));
     }
 
+    #[tokio::test]
+    async fn return_empty_event_on_effect_cancel_err() {
+        let mocked_heartbeat_function: Arc<HeartbeatEffectExecutor> =
+            Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());
+
+        let result = execute(
+            &PresenceInput::new(
+                &Some(vec!["ch1".to_string()]),
+                &Some(vec!["cg1".to_string()]),
+            ),
+            5,
+            Some(PubNubError::Transport {
+                details: "test".into(),
+                response: Some(Box::new(TransportResponse {
+                    status: 500,
+                    ..Default::default()
+                })),
+            }),
+            "id",
+            &RequestRetryConfiguration::Linear {
+                max_retry: 5,
+                delay: 2,
+                excluded_endpoints: None,
+            },
+            &mocked_heartbeat_function,
+        )
+        .await;
+
+        assert!(result.is_empty());
+    }
+
     #[tokio::test]
     async fn return_heartbeat_give_up_event_on_error_with_none_auto_retry_policy() {
         let mocked_heartbeat_function: Arc<HeartbeatEffectExecutor> = Arc::new(move |_| {
diff --git a/src/dx/presence/event_engine/effects/mod.rs b/src/dx/presence/event_engine/effects/mod.rs
index 4dceb748..07528ada 100644
--- a/src/dx/presence/event_engine/effects/mod.rs
+++ b/src/dx/presence/event_engine/effects/mod.rs
@@ -2,6 +2,7 @@
 
 use async_channel::Sender;
 use futures::future::BoxFuture;
+use spin::RwLock;
 
 use crate::{
     core::{
@@ -73,6 +74,9 @@ pub(crate) enum PresenceEffect {
         /// Unique effect identifier.
         id: String,
 
+        /// Whether delayed heartbeat effect has been cancelled or not.
+        cancelled: RwLock<bool>,
+
         /// User input with channels and groups.
         ///
         /// Object contains list of channels and groups for which `user_id`
@@ -123,6 +127,9 @@ pub(crate) enum PresenceEffect {
         /// Unique effect identifier.
         id: String,
 
+        /// Whether wait effect has been cancelled or not.
+        cancelled: RwLock<bool>,
+
         /// User input with channels and groups.
         ///
         /// Object contains list of channels and groups for which `user_id`
@@ -176,16 +183,6 @@ impl Debug for PresenceEffect {
 impl Effect for PresenceEffect {
     type Invocation = PresenceEffectInvocation;
 
-    fn id(&self) -> String {
-        match self {
-            Self::Heartbeat { id, .. }
-            | Self::DelayedHeartbeat { id, .. }
-            | Self::Leave { id, .. }
-            | Self::Wait { id, .. } => id,
-        }
-        .into()
-    }
-
     fn name(&self) -> String {
         match self {
             Self::Heartbeat { .. } => "HEARTBEAT",
@@ -196,6 +193,16 @@ impl Effect for PresenceEffect {
         .into()
     }
 
+    fn id(&self) -> String {
+        match self {
+            Self::Heartbeat { id, .. }
+            | Self::DelayedHeartbeat { id, .. }
+            | Self::Leave { id, .. }
+            | Self::Wait { id, .. } => id,
+        }
+        .into()
+    }
+
     async fn run(&self) -> Vec<<Self::Invocation as EffectInvocation>::Event> {
         match self {
             Self::Heartbeat {
@@ -245,14 +252,20 @@ impl Effect for PresenceEffect {
         match self {
             PresenceEffect::DelayedHeartbeat {
                 id,
+                cancelled,
                 cancellation_channel,
                 ..
             }
             | PresenceEffect::Wait {
                 id,
+                cancelled,
                 cancellation_channel,
                 ..
             } => {
+                {
+                    let mut cancelled_slot = cancelled.write();
+                    *cancelled_slot = true;
+                }
                 cancellation_channel
                     .send_blocking(id.clone())
                     .expect("Cancellation pipe is broken!");
@@ -260,6 +273,15 @@ impl Effect for PresenceEffect {
             _ => { /* cannot cancel other effects */ }
         }
     }
+
+    fn is_cancelled(&self) -> bool {
+        match self {
+            Self::DelayedHeartbeat { cancelled, .. } | Self::Wait { cancelled, .. } => {
+                *cancelled.read()
+            }
+            _ => false,
+        }
+    }
 }
 
 #[cfg(test)]
@@ -273,6 +295,7 @@ mod it_should {
 
         let effect = PresenceEffect::Wait {
             id: Uuid::new_v4().to_string(),
+            cancelled: RwLock::new(false),
             input: PresenceInput::new(&None, &None),
             executor: Arc::new(|_| Box::pin(async move { Ok(()) })),
             cancellation_channel: tx,
@@ -288,6 +311,7 @@ mod it_should {
 
         let effect = PresenceEffect::DelayedHeartbeat {
             id: Uuid::new_v4().to_string(),
+            cancelled: RwLock::new(false),
             input: PresenceInput::new(&None, &None),
             attempts: 0,
             reason: PubNubError::EffectCanceled,
diff --git a/src/dx/pubnub_client.rs b/src/dx/pubnub_client.rs
index b5df8a8f..dd2650d1 100644
--- a/src/dx/pubnub_client.rs
+++ b/src/dx/pubnub_client.rs
@@ -76,7 +76,7 @@ use crate::{
 /// // note that `with_reqwest_transport` requires `reqwest` feature
 /// // to be enabled (default)
 /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-/// let client = PubNubClientBuilder::with_reqwest_transport()
+/// let pubnub = PubNubClientBuilder::with_reqwest_transport()
 ///    .with_keyset(Keyset {
 ///         publish_key: Some("pub-c-abc123"),
 ///         subscribe_key: "sub-c-abc123",
@@ -112,7 +112,7 @@ use crate::{
 /// // note that MyTransport must implement the `Transport` trait
 /// let transport = MyTransport::new();
 ///
-/// let client = PubNubClientBuilder::with_transport(MyTransport)
+/// let pubnub = PubNubClientBuilder::with_transport(MyTransport)
 ///    .with_keyset(Keyset {
 ///         publish_key: Some("pub-c-abc123"),
 ///         subscribe_key: "sub-c-abc123",
@@ -161,7 +161,7 @@ pub type PubNubGenericClient<T, D> = PubNubClientInstance<PubNubMiddleware<T>, D
 /// // note that `with_reqwest_transport` requires `reqwest` feature
 /// // to be enabled (default)
 /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-/// let client = PubNubClientBuilder::with_reqwest_transport()
+/// let pubnub = PubNubClientBuilder::with_reqwest_transport()
 ///    .with_keyset(Keyset {
 ///         publish_key: Some("pub-c-abc123"),
 ///         subscribe_key: "sub-c-abc123",
@@ -197,7 +197,7 @@ pub type PubNubGenericClient<T, D> = PubNubClientInstance<PubNubMiddleware<T>, D
 /// // note that MyTransport must implement the `Transport` trait
 /// let transport = MyTransport::new();
 ///
-/// let client = PubNubClientBuilder::with_transport(MyTransport)
+/// let pubnub = PubNubClientBuilder::with_transport(MyTransport)
 ///    .with_keyset(Keyset {
 ///         publish_key: Some("pub-c-abc123"),
 ///         subscribe_key: "sub-c-abc123",
@@ -417,7 +417,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -426,7 +426,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let channel = client.channel("my_channel");
+    /// let channel = pubnub.channel("my_channel");
     /// #     Ok(())
     /// # }
     /// ```
@@ -464,7 +464,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -473,7 +473,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let channel = client.channels(&["my_channel_1", "my_channel_2"]);
+    /// let channels = pubnub.channels(&["my_channel_1", "my_channel_2"]);
     /// #     Ok(())
     /// # }
     /// ```
@@ -517,7 +517,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -526,7 +526,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let channel_group = client.channel_group("my_group");
+    /// let channel_group = pubnub.channel_group("my_group");
     /// #     Ok(())
     /// # }
     /// ```
@@ -565,7 +565,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -574,7 +574,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let channel_groups = client.channel_groups(&["my_group_1", "my_group_2"]);
+    /// let channel_groups = pubnub.channel_groups(&["my_group_1", "my_group_2"]);
     /// #     Ok(())
     /// # }
     /// ```
@@ -620,7 +620,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -629,7 +629,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let channel_metadata = client.channel_metadata("channel_meta");
+    /// let channel_metadata = pubnub.channel_metadata("channel_meta");
     /// #     Ok(())
     /// # }
     /// ```
@@ -670,7 +670,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -679,7 +679,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let channels_metadata = client.channels_metadata(
+    /// let channels_metadata = pubnub.channels_metadata(
     ///     &["channel_meta_1", "channel_meta_2"]
     /// );
     /// #     Ok(())
@@ -727,7 +727,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -736,7 +736,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let user_metadata = client.user_metadata("user_meta");
+    /// let user_metadata = pubnub.user_metadata("user_meta");
     /// #     Ok(())
     /// # }
     /// ```
@@ -776,7 +776,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -785,7 +785,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let users_metadata = client.users_metadata(&["user_meta_1", "user_meta_2"]);
+    /// let users_metadata = pubnub.users_metadata(&["user_meta_1", "user_meta_2"]);
     /// #     Ok(())
     /// # }
     /// ```
@@ -823,7 +823,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
     /// let token = "<auth token from grant_token>";
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -832,7 +832,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// client.set_token(token);
+    /// pubnub.set_token(token);
     /// // Now client has access to all endpoints for which `token` has
     /// // permissions.
     /// #     Ok(())
@@ -854,7 +854,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     ///
     /// # fn main() -> Result<(), pubnub::core::PubNubError> {
     /// #     let token = "<auth token from grant_token>";
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -863,8 +863,8 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// #     client.set_token(token);
-    /// println!("Current authentication token: {:?}", client.get_token());
+    /// #     pubnub.set_token(token);
+    /// println!("Current authentication token: {:?}", pubnub.get_token());
     /// // Now client has access to all endpoints for which `token` has
     /// // permissions.
     /// #     Ok(())
@@ -1293,7 +1293,7 @@ impl PubNubClientBuilder {
     /// // note that MyTransport must implement the `Transport` trait
     /// let transport = MyTransport::new();
     ///
-    /// let client = PubNubClientBuilder::with_transport(transport)
+    /// let pubnub = PubNubClientBuilder::with_transport(transport)
     ///     .with_keyset(Keyset {
     ///         publish_key: Some("pub-c-abc123"),
     ///         subscribe_key: "sub-c-abc123",
@@ -1343,7 +1343,7 @@ impl PubNubClientBuilder {
     /// // note that MyTransport must implement the `Transport` trait
     /// let transport = MyTransport::new();
     ///
-    /// let client = PubNubClientBuilder::with_transport(transport)
+    /// let pubnub = PubNubClientBuilder::with_transport(transport)
     ///     .with_keyset(Keyset {
     ///         publish_key: Some("pub-c-abc123"),
     ///         subscribe_key: "sub-c-abc123",
@@ -1392,7 +1392,7 @@ impl PubNubClientBuilder {
     /// // note that MyTransport must implement the `Transport` trait
     /// let transport = MyTransport::new();
     ///
-    /// let client = PubNubClientBuilder::with_blocking_transport(transport)
+    /// let pubnub = PubNubClientBuilder::with_blocking_transport(transport)
     ///     .with_keyset(Keyset {
     ///         publish_key: Some("pub-c-abc123"),
     ///         subscribe_key: "sub-c-abc123",
@@ -1445,7 +1445,7 @@ impl PubNubClientBuilder {
     /// // note that MyTransport must implement the `Transport` trait
     /// let transport = MyTransport::new();
     ///
-    /// let client = PubNubClientBuilder::with_blocking_transport(transport)
+    /// let pubnub = PubNubClientBuilder::with_blocking_transport(transport)
     ///     .with_keyset(Keyset {
     ///         publish_key: Some("pub-c-abc123"),
     ///         subscribe_key: "sub-c-abc123",
@@ -1591,7 +1591,7 @@ impl<T> PubNubClientRuntimeBuilder<T> {
     /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
     /// // note that with_reqwest_transport is only available when
     /// // the `reqwest` feature is enabled (default)
-    /// let client = PubNubClientBuilder::with_reqwest_transport()
+    /// let pubnub = PubNubClientBuilder::with_reqwest_transport()
     ///     .with_runtime(MyRuntime)
     ///     .with_keyset(Keyset {
     ///         subscribe_key: "sub-c-abc123",
@@ -1653,7 +1653,7 @@ impl<T> PubNubClientRuntimeBuilder<T> {
     /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
     /// // note that with_reqwest_transport is only available when
     /// // the `reqwest` feature is enabled (default)
-    /// let client = PubNubClientBuilder::with_reqwest_transport()
+    /// let pubnub = PubNubClientBuilder::with_reqwest_transport()
     ///     .with_runtime(MyRuntime)
     ///     .with_keyset(Keyset {
     ///         subscribe_key: "sub-c-abc123",
diff --git a/src/dx/subscribe/event_dispatcher.rs b/src/dx/subscribe/event_dispatcher.rs
index dfd603a7..bb2007a5 100644
--- a/src/dx/subscribe/event_dispatcher.rs
+++ b/src/dx/subscribe/event_dispatcher.rs
@@ -1,7 +1,7 @@
 //! # Event dispatcher module
 //!
 //! This module contains the [`EventDispatcher`] type, which is used by
-//! [`PubNubClientInstance`], [`Subscription2`] and [`SubscriptionSet`] to let
+//! [`PubNubClientInstance`], [`Subscription`] and [`SubscriptionSet`] to let
 //! users attach listeners to the specific event types.
 
 use spin::{RwLock, RwLockReadGuard, RwLockWriteGuard};
diff --git a/src/dx/subscribe/event_engine/effect_handler.rs b/src/dx/subscribe/event_engine/effect_handler.rs
index 5c5b2104..462512ae 100644
--- a/src/dx/subscribe/event_engine/effect_handler.rs
+++ b/src/dx/subscribe/event_engine/effect_handler.rs
@@ -1,4 +1,5 @@
 use async_channel::Sender;
+use spin::rwlock::RwLock;
 use uuid::Uuid;
 
 use crate::core::RequestRetryConfiguration;
@@ -60,6 +61,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
             SubscribeEffectInvocation::Handshake { input, cursor } => {
                 Some(SubscribeEffect::Handshake {
                     id: Uuid::new_v4().to_string(),
+                    cancelled: RwLock::new(false),
                     input: input.clone(),
                     cursor: cursor.clone(),
                     executor: self.subscribe_call.clone(),
@@ -73,6 +75,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
                 reason,
             } => Some(SubscribeEffect::HandshakeReconnect {
                 id: Uuid::new_v4().to_string(),
+                cancelled: RwLock::new(false),
                 input: input.clone(),
                 cursor: cursor.clone(),
                 attempts: *attempts,
@@ -84,6 +87,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
             SubscribeEffectInvocation::Receive { input, cursor } => {
                 Some(SubscribeEffect::Receive {
                     id: Uuid::new_v4().to_string(),
+                    cancelled: RwLock::new(false),
                     input: input.clone(),
                     cursor: cursor.clone(),
                     executor: self.subscribe_call.clone(),
@@ -97,6 +101,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
                 reason,
             } => Some(SubscribeEffect::ReceiveReconnect {
                 id: Uuid::new_v4().to_string(),
+                cancelled: RwLock::new(false),
                 input: input.clone(),
                 cursor: cursor.clone(),
                 attempts: *attempts,
diff --git a/src/dx/subscribe/event_engine/effects/handshake.rs b/src/dx/subscribe/event_engine/effects/handshake.rs
index cc0a3315..c0fab74b 100644
--- a/src/dx/subscribe/event_engine/effects/handshake.rs
+++ b/src/dx/subscribe/event_engine/effects/handshake.rs
@@ -1,6 +1,7 @@
 use futures::TryFutureExt;
 use log::info;
 
+use crate::core::PubNubError;
 use crate::subscribe::SubscriptionCursor;
 use crate::{
     dx::subscribe::event_engine::{
@@ -36,7 +37,11 @@ pub(super) async fn execute(
     .map_ok_or_else(
         |error| {
             log::error!("Handshake error: {:?}", error);
-            vec![SubscribeEvent::HandshakeFailure { reason: error }]
+
+            // Cancel is possible and no retries should be done.
+            (!matches!(error, PubNubError::EffectCanceled))
+                .then(|| vec![SubscribeEvent::HandshakeFailure { reason: error }])
+                .unwrap_or(vec![])
         },
         |subscribe_result| {
             let cursor = {
@@ -126,4 +131,23 @@ mod should {
             SubscribeEvent::HandshakeFailure { .. }
         ));
     }
+
+    #[tokio::test]
+    async fn return_empty_event_on_effect_cancel_err() {
+        let mock_handshake_function: Arc<SubscribeEffectExecutor> =
+            Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());
+
+        let result = execute(
+            &SubscriptionInput::new(
+                &Some(vec!["ch1".to_string()]),
+                &Some(vec!["cg1".to_string()]),
+            ),
+            &None,
+            "id",
+            &mock_handshake_function,
+        )
+        .await;
+
+        assert!(result.is_empty());
+    }
 }
diff --git a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs
index 8a89ba9f..17651655 100644
--- a/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs
+++ b/src/dx/subscribe/event_engine/effects/handshake_reconnection.rs
@@ -21,7 +21,9 @@ pub(super) async fn execute(
     retry_policy: &RequestRetryConfiguration,
     executor: &Arc<SubscribeEffectExecutor>,
 ) -> Vec<SubscribeEvent> {
-    if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) {
+    if !matches!(reason, PubNubError::EffectCanceled)
+        && !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason))
+    {
         return vec![SubscribeEvent::HandshakeReconnectGiveUp { reason }];
     }
 
@@ -176,6 +178,38 @@ mod should {
         ));
     }
 
+    #[tokio::test]
+    async fn return_empty_event_on_effect_cancel_err() {
+        let mock_handshake_function: Arc<SubscribeEffectExecutor> =
+            Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());
+
+        let result = execute(
+            &SubscriptionInput::new(
+                &Some(vec!["ch1".to_string()]),
+                &Some(vec!["cg1".to_string()]),
+            ),
+            &None,
+            1,
+            PubNubError::Transport {
+                details: "test".into(),
+                response: Some(Box::new(TransportResponse {
+                    status: 500,
+                    ..Default::default()
+                })),
+            },
+            "id",
+            &RequestRetryConfiguration::Linear {
+                delay: 0,
+                max_retry: 1,
+                excluded_endpoints: None,
+            },
+            &mock_handshake_function,
+        )
+        .await;
+
+        assert!(result.is_empty());
+    }
+
     #[tokio::test]
     async fn return_handshake_reconnect_give_up_event_on_err_with_none_auto_retry_policy() {
         let mock_handshake_function: Arc<SubscribeEffectExecutor> = Arc::new(move |_| {
diff --git a/src/dx/subscribe/event_engine/effects/mod.rs b/src/dx/subscribe/event_engine/effects/mod.rs
index a51f9199..f5893c97 100644
--- a/src/dx/subscribe/event_engine/effects/mod.rs
+++ b/src/dx/subscribe/event_engine/effects/mod.rs
@@ -2,6 +2,7 @@
 
 use async_channel::Sender;
 use futures::future::BoxFuture;
+use spin::RwLock;
 
 use crate::{
     core::{event_engine::Effect, PubNubError, RequestRetryConfiguration},
@@ -65,6 +66,9 @@ pub(crate) enum SubscribeEffect {
         /// Unique effect identifier.
         id: String,
 
+        /// Whether handshake effect has been cancelled or not.
+        cancelled: RwLock<bool>,
+
         /// User input with channels and groups.
         ///
         /// Object contains list of channels and channel groups which will be
@@ -93,6 +97,9 @@ pub(crate) enum SubscribeEffect {
         /// Unique effect identifier.
         id: String,
 
+        /// Whether handshake reconnect effect has been cancelled or not.
+        cancelled: RwLock<bool>,
+
         /// User input with channels and groups.
         ///
         /// Object contains list of channels and channel groups which has been
@@ -132,6 +139,9 @@ pub(crate) enum SubscribeEffect {
         /// Unique effect identifier.
         id: String,
 
+        /// Whether receive effect has been cancelled or not.
+        cancelled: RwLock<bool>,
+
         /// User input with channels and groups.
         ///
         /// Object contains list of channels and channel groups for which
@@ -160,6 +170,9 @@ pub(crate) enum SubscribeEffect {
         /// Unique effect identifier.
         id: String,
 
+        /// Whether receive reconnect effect has been cancelled or not.
+        cancelled: RwLock<bool>,
+
         /// User input with channels and groups.
         ///
         /// Object contains list of channels and channel groups which has been
@@ -385,24 +398,32 @@ impl Effect for SubscribeEffect {
         match self {
             Self::Handshake {
                 id,
+                cancelled,
                 cancellation_channel,
                 ..
             }
             | Self::HandshakeReconnect {
                 id,
+                cancelled,
                 cancellation_channel,
                 ..
             }
             | Self::Receive {
                 id,
+                cancelled,
                 cancellation_channel,
                 ..
             }
             | Self::ReceiveReconnect {
                 id,
+                cancelled,
                 cancellation_channel,
                 ..
             } => {
+                {
+                    let mut cancelled_slot = cancelled.write();
+                    *cancelled_slot = true;
+                }
                 cancellation_channel
                     .send_blocking(id.clone())
                     .expect("cancellation pipe is broken!");
@@ -410,6 +431,16 @@ impl Effect for SubscribeEffect {
             _ => { /* cannot cancel other effects */ }
         }
     }
+
+    fn is_cancelled(&self) -> bool {
+        match self {
+            Self::Handshake { cancelled, .. }
+            | Self::HandshakeReconnect { cancelled, .. }
+            | Self::Receive { cancelled, .. }
+            | Self::ReceiveReconnect { cancelled, .. } => *cancelled.read(),
+            _ => false,
+        }
+    }
 }
 
 #[cfg(test)]
@@ -424,6 +455,7 @@ mod should {
 
         let effect = SubscribeEffect::Handshake {
             id: Uuid::new_v4().to_string(),
+            cancelled: RwLock::new(false),
             input: SubscriptionInput::new(&None, &None),
             cursor: None,
             executor: Arc::new(|_| {
diff --git a/src/dx/subscribe/event_engine/effects/receive.rs b/src/dx/subscribe/event_engine/effects/receive.rs
index c3dc2402..fdd4cd42 100644
--- a/src/dx/subscribe/event_engine/effects/receive.rs
+++ b/src/dx/subscribe/event_engine/effects/receive.rs
@@ -128,4 +128,23 @@ mod should {
             SubscribeEvent::ReceiveFailure { .. }
         ));
     }
+
+    #[tokio::test]
+    async fn return_empty_event_on_effect_cancel_err() {
+        let mock_receive_function: Arc<SubscribeEffectExecutor> =
+            Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());
+
+        let result = execute(
+            &SubscriptionInput::new(
+                &Some(vec!["ch1".to_string()]),
+                &Some(vec!["cg1".to_string()]),
+            ),
+            &Default::default(),
+            "id",
+            &mock_receive_function,
+        )
+        .await;
+
+        assert!(result.is_empty());
+    }
 }
diff --git a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs
index ca7efa7d..5e0d8e36 100644
--- a/src/dx/subscribe/event_engine/effects/receive_reconnection.rs
+++ b/src/dx/subscribe/event_engine/effects/receive_reconnection.rs
@@ -23,7 +23,9 @@ pub(crate) async fn execute(
     retry_policy: &RequestRetryConfiguration,
     executor: &Arc<SubscribeEffectExecutor>,
 ) -> Vec<SubscribeEvent> {
-    if !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason)) {
+    if !matches!(reason, PubNubError::EffectCanceled)
+        && !retry_policy.retriable(Some("/v2/subscribe"), &attempt, Some(&reason))
+    {
         return vec![SubscribeEvent::ReceiveReconnectGiveUp { reason }];
     }
 
@@ -225,6 +227,38 @@ mod should {
         ));
     }
 
+    #[tokio::test]
+    async fn return_empty_event_on_effect_cancel_err() {
+        let mock_receive_function: Arc<SubscribeEffectExecutor> =
+            Arc::new(move |_| async move { Err(PubNubError::EffectCanceled) }.boxed());
+
+        let result = execute(
+            &SubscriptionInput::new(
+                &Some(vec!["ch1".to_string()]),
+                &Some(vec!["cg1".to_string()]),
+            ),
+            &Default::default(),
+            10,
+            PubNubError::Transport {
+                details: "test".into(),
+                response: Some(Box::new(TransportResponse {
+                    status: 500,
+                    ..Default::default()
+                })),
+            },
+            "id",
+            &RequestRetryConfiguration::Linear {
+                max_retry: 20,
+                delay: 0,
+                excluded_endpoints: None,
+            },
+            &mock_receive_function,
+        )
+        .await;
+
+        assert!(result.is_empty());
+    }
+
     #[tokio::test]
     async fn return_receive_reconnect_give_up_event_on_err_with_none_auto_retry_policy() {
         let mock_receive_function: Arc<SubscribeEffectExecutor> = Arc::new(move |_| {
diff --git a/src/dx/subscribe/event_engine/state.rs b/src/dx/subscribe/event_engine/state.rs
index 6174e196..f5debe80 100644
--- a/src/dx/subscribe/event_engine/state.rs
+++ b/src/dx/subscribe/event_engine/state.rs
@@ -231,7 +231,10 @@ impl SubscribeState {
                         input: SubscriptionInput::new(channels, channel_groups),
                         cursor: cursor.clone(),
                     }),
-                    None,
+                    Some(vec![EmitStatus(ConnectionStatus::SubscriptionChanged {
+                        channels: channels.clone(),
+                        channel_groups: channel_groups.clone(),
+                    })]),
                 ))
             }
             Self::ReceiveFailed { cursor, .. } => Some(self.transition_to(
@@ -290,7 +293,10 @@ impl SubscribeState {
                     input: SubscriptionInput::new(channels, channel_groups),
                     cursor: restore_cursor.clone(),
                 }),
-                None,
+                Some(vec![EmitStatus(ConnectionStatus::SubscriptionChanged {
+                    channels: channels.clone(),
+                    channel_groups: channel_groups.clone(),
+                })]),
             )),
             Self::ReceiveFailed { .. } => Some(self.transition_to(
                 Some(Self::Handshaking {
diff --git a/src/dx/subscribe/mod.rs b/src/dx/subscribe/mod.rs
index e01a8c33..27c4c8e0 100644
--- a/src/dx/subscribe/mod.rs
+++ b/src/dx/subscribe/mod.rs
@@ -148,7 +148,7 @@ where
     ///
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -160,8 +160,8 @@ where
     /// // ...
     /// // We need to pass client into other component which would like to
     /// // have own listeners to handle real-time events.
-    /// let empty_client = client.clone_empty();
-    /// // self.other_component(empty_client);    
+    /// let empty_pubnub_client = pubnub.clone_empty();
+    /// // self.other_component(empty_pubnub_client);    
     /// #     Ok(())
     /// # }
     /// ```
@@ -199,12 +199,16 @@ where
     ///
     /// ```rust,no_run
     /// use futures::StreamExt;
-    /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset, subscribe::EventEmitter};
+    /// use pubnub::{
+    ///     subscribe::{
+    ///         EventEmitter, {EventSubscriber, SubscriptionParams},
+    ///     },
+    ///     Keyset, PubNubClient, PubNubClientBuilder,
+    /// };
     ///
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// use pubnub::subscribe::{EventSubscriber, SubscriptionParams};
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -213,7 +217,7 @@ where
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let subscription = client.subscription(SubscriptionParams {
+    /// let subscription = pubnub.subscription(SubscriptionParams {
     ///     channels: Some(&["my_channel_1", "my_channel_2", "my_channel_3"]),
     ///     channel_groups: None,
     ///     options: None
@@ -257,14 +261,16 @@ where
     ///
     /// ```no_run
     /// use futures::StreamExt;
-    /// use pubnub::dx::subscribe::{EventEmitter, SubscribeStreamEvent, Update};
+    /// use pubnub::{
+    ///     subscribe::{
+    ///         EventEmitter, {EventSubscriber, SubscriptionParams, Update},
+    ///     },
+    ///     Keyset, PubNubClient, PubNubClientBuilder,
+    /// };
     ///
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    /// #     use pubnub::{Keyset, PubNubClientBuilder};
-    /// use pubnub::subscribe::SubscriptionParams;
-    /// #
-    /// #     let client = PubNubClientBuilder::with_reqwest_transport()
+    /// #     let pubnub = PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #             subscribe_key: "demo",
     /// #             publish_key: Some("demo"),
@@ -272,14 +278,14 @@ where
     /// #         })
     /// #         .with_user_id("user_id")
     /// #         .build()?;
-    /// # let subscription = client.subscription(SubscriptionParams {
+    /// # let subscription = pubnub.subscription(SubscriptionParams {
     /// #     channels: Some(&["channel"]),
     /// #     channel_groups: None,
     /// #     options: None
     /// # });
     /// # let stream = // DataStream<Message>
     /// #     subscription.messages_stream();
-    /// client.disconnect();
+    /// pubnub.disconnect();
     /// # Ok(())
     /// # }
     /// ```
@@ -330,14 +336,16 @@ where
     ///
     /// ```no_run
     /// use futures::StreamExt;
-    /// use pubnub::dx::subscribe::{EventEmitter, SubscribeStreamEvent, Update};
+    /// use pubnub::{
+    ///     subscribe::{
+    ///         EventEmitter, {EventSubscriber, SubscriptionParams, Update},
+    ///     },
+    ///     Keyset, PubNubClient, PubNubClientBuilder,
+    /// };
     ///
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    /// #     use pubnub::{Keyset, PubNubClientBuilder};
-    /// use pubnub::subscribe::SubscriptionParams;
-    /// #
-    /// #     let client = PubNubClientBuilder::with_reqwest_transport()
+    /// #     let pubnub = PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #             subscribe_key: "demo",
     /// #             publish_key: Some("demo"),
@@ -345,7 +353,7 @@ where
     /// #         })
     /// #         .with_user_id("user_id")
     /// #         .build()?;
-    /// # let subscription = client.subscription(SubscriptionParams {
+    /// # let subscription = pubnub.subscription(SubscriptionParams {
     /// #     channels: Some(&["channel"]),
     /// #     channel_groups: None,
     /// #     options: None
@@ -353,8 +361,8 @@ where
     /// # let stream = // DataStream<Message>
     /// #     subscription.messages_stream();
     /// # // .....
-    /// # client.disconnect();
-    /// client.reconnect(None);
+    /// # pubnub.disconnect();
+    /// pubnub.reconnect(None);
     /// # Ok(())
     /// # }
     /// ```
@@ -363,6 +371,7 @@ where
     pub fn reconnect(&self, cursor: Option<SubscriptionCursor>) {
         #[cfg(feature = "presence")]
         let mut input: Option<SubscriptionInput> = None;
+        let cursor = cursor.or_else(|| self.cursor.read().clone());
 
         if let Some(manager) = self.subscription_manager(false).read().as_ref() {
             #[cfg(feature = "presence")]
@@ -403,6 +412,9 @@ where
     /// created [`Subscription`] and [`SubscriptionSet`].
     pub fn unsubscribe_all(&self) {
         {
+            let mut cursor = self.cursor.write();
+            *cursor = None;
+
             if let Some(manager) = self.subscription_manager(false).write().as_mut() {
                 manager.unregister_all()
             }
@@ -624,13 +636,16 @@ impl<T, D> PubNubClientInstance<T, D> {
     ///
     /// ```no_run // Starts listening for real-time updates
     /// use futures::StreamExt;
-    /// use pubnub::dx::subscribe::{SubscribeStreamEvent, Update};
+    /// use pubnub::{
+    ///     subscribe::{
+    ///         EventEmitter, {EventSubscriber, SubscriptionParams, Update},
+    ///     },
+    ///     Keyset, PubNubClient, PubNubClientBuilder,
+    /// };
     ///
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    /// # use pubnub::{Keyset, PubNubClientBuilder};
-    /// #
-    /// #   let client = PubNubClientBuilder::with_reqwest_transport()
+    /// #   let pubnub = PubNubClientBuilder::with_reqwest_transport()
     /// #      .with_keyset(Keyset {
     /// #          subscribe_key: "demo",
     /// #          publish_key: Some("demo"),
@@ -638,7 +653,7 @@ impl<T, D> PubNubClientInstance<T, D> {
     /// #      })
     /// #      .with_user_id("user_id")
     /// #      .build()?;
-    /// client
+    /// pubnub
     ///     .subscribe_raw()
     ///     .channels(["hello".into(), "world".into()].to_vec())
     ///     .execute()?
@@ -900,7 +915,7 @@ mod should {
             channel_groups: Some(&["group_a"]),
             options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
         });
-        subscription.subscribe(None);
+        subscription.subscribe();
 
         let status = client.status_stream().next().await.unwrap();
         let _ = subscription.messages_stream().next().await.unwrap();
diff --git a/src/dx/subscribe/subscription.rs b/src/dx/subscribe/subscription.rs
index 3196b248..9bca92ff 100644
--- a/src/dx/subscribe/subscription.rs
+++ b/src/dx/subscribe/subscription.rs
@@ -44,7 +44,7 @@ use crate::{
 /// };
 ///
 /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-/// let client = // PubNubClient
+/// let pubnub = // PubNubClient
 /// #     PubNubClientBuilder::with_reqwest_transport()
 /// #         .with_keyset(Keyset {
 /// #              subscribe_key: "demo",
@@ -53,7 +53,7 @@ use crate::{
 /// #          })
 /// #         .with_user_id("uuid")
 /// #         .build()?;
-/// let channel = client.channel("my_channel");
+/// let channel = pubnub.channel("my_channel");
 /// // Creating Subscription instance for the Channel entity to subscribe and listen
 /// // for real-time events.
 /// let subscription = channel.subscription(None);
@@ -72,7 +72,7 @@ use crate::{
 /// };
 ///
 /// # fn main() -> Result<(), pubnub::core::PubNubError> {
-/// let client = // PubNubClient
+/// let pubnub = // PubNubClient
 /// #     PubNubClientBuilder::with_reqwest_transport()
 /// #         .with_keyset(Keyset {
 /// #              subscribe_key: "demo",
@@ -81,7 +81,7 @@ use crate::{
 /// #          })
 /// #         .with_user_id("uuid")
 /// #         .build()?;
-/// let channels = client.channels(&["my_channel_1", "my_channel_2"]);
+/// let channels = pubnub.channels(&["my_channel_1", "my_channel_2"]);
 /// // Two `Subscription` instances can be added to create `SubscriptionSet` which can be used
 /// // to attach listeners and subscribe in one place for both subscriptions used in addition
 /// // operation.
@@ -206,7 +206,7 @@ where
     ///
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -215,7 +215,7 @@ where
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let channel = client.channel("my_channel");
+    /// let channel = pubnub.channel("my_channel");
     /// // Creating Subscription instance for the Channel entity to subscribe and listen
     /// // for real-time events.
     /// let subscription = channel.subscription(None);
@@ -425,6 +425,29 @@ where
         *self.is_subscribed.read()
     }
 
+    /// Register `Subscription` within `SubscriptionManager`.
+    ///
+    /// # Arguments
+    ///
+    /// - `cursor` - Subscription real-time events catch up cursor.
+    fn register_with_cursor(&self, cursor: Option<SubscriptionCursor>) {
+        let Some(client) = self.client.upgrade().clone() else {
+            return;
+        };
+
+        {
+            if let Some(manager) = client.subscription_manager(true).write().as_mut() {
+                // Mark entities as "in use" by subscription.
+                self.entity.increase_subscriptions_count();
+
+                if let Some((_, handler)) = self.clones.read().iter().next() {
+                    let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
+                    manager.register(&handler, cursor);
+                }
+            }
+        }
+    }
+
     /// Filters the given list of `Update` events based on the subscription
     /// input and the current timetoken.
     ///
@@ -482,36 +505,44 @@ where
     T: Transport + Send + Sync + 'static,
     D: Deserializer + Send + Sync + 'static,
 {
-    fn subscribe(&self, cursor: Option<SubscriptionCursor>) {
+    fn subscribe(&self) {
         let mut is_subscribed = self.is_subscribed.write();
         if *is_subscribed {
             return;
         }
         *is_subscribed = true;
 
-        if cursor.is_some() {
-            let mut cursor_slot = self.cursor.write();
-            if let Some(current_cursor) = cursor_slot.as_ref() {
-                let catchup_cursor = cursor.clone().unwrap_or_default();
-                catchup_cursor
-                    .gt(current_cursor)
-                    .then(|| *cursor_slot = Some(catchup_cursor));
-            } else {
-                *cursor_slot = cursor.clone();
-            }
+        self.register_with_cursor(self.cursor.read().clone());
+    }
+
+    fn subscribe_with_timetoken<SC>(&self, cursor: SC)
+    where
+        SC: Into<SubscriptionCursor>,
+    {
+        let mut is_subscribed = self.is_subscribed.write();
+        if *is_subscribed {
+            return;
         }
+        *is_subscribed = true;
 
-        if let Some(client) = self.client().upgrade().clone() {
-            if let Some(manager) = client.subscription_manager(true).write().as_mut() {
-                // Mark entities as "in use" by subscription.
-                self.entity.increase_subscriptions_count();
+        let user_cursor = cursor.into();
+        let cursor = user_cursor.is_valid().then_some(user_cursor);
 
-                if let Some((_, handler)) = self.clones.read().iter().next() {
-                    let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
-                    manager.register(&handler, cursor);
+        {
+            if cursor.is_some() {
+                let mut cursor_slot = self.cursor.write();
+                if let Some(current_cursor) = cursor_slot.as_ref() {
+                    let catchup_cursor = cursor.clone().unwrap_or_default();
+                    catchup_cursor
+                        .gt(current_cursor)
+                        .then(|| *cursor_slot = Some(catchup_cursor));
+                } else {
+                    *cursor_slot = cursor.clone();
                 }
             }
         }
+
+        self.register_with_cursor(cursor);
     }
 
     fn unsubscribe(&self) {
@@ -521,6 +552,9 @@ where
                 return;
             }
             *is_subscribed_slot = false;
+
+            let mut cursor = self.cursor.write();
+            *cursor = None;
         }
 
         if let Some(client) = self.client().upgrade().clone() {
diff --git a/src/dx/subscribe/subscription_set.rs b/src/dx/subscribe/subscription_set.rs
index 3e6d58ce..a967aceb 100644
--- a/src/dx/subscribe/subscription_set.rs
+++ b/src/dx/subscribe/subscription_set.rs
@@ -39,12 +39,11 @@ use crate::{
 /// ### Multiplexed subscription
 ///
 /// ```rust
-/// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
+/// use pubnub::{subscribe::SubscriptionParams, Keyset, PubNubClient, PubNubClientBuilder};
 ///
 /// # #[tokio::main]
 /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
-/// use pubnub::subscribe::SubscriptionParams;
-/// let client = // PubNubClient
+/// let pubnub = // PubNubClient
 /// #     PubNubClientBuilder::with_reqwest_transport()
 /// #         .with_keyset(Keyset {
 /// #              subscribe_key: "demo",
@@ -53,7 +52,7 @@ use crate::{
 /// #          })
 /// #         .with_user_id("uuid")
 /// #         .build()?;
-/// let subscription = client.subscription(SubscriptionParams {
+/// let subscription = pubnub.subscription(SubscriptionParams {
 ///     channels: Some(&["my_channel_1", "my_channel_2"]),
 ///     channel_groups:Some(&["my_group"]),
 ///     options:None
@@ -72,7 +71,7 @@ use crate::{
 ///
 /// # #[tokio::main]
 /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
-/// let client = // PubNubClient
+/// let pubnub = // PubNubClient
 /// #     PubNubClientBuilder::with_reqwest_transport()
 /// #         .with_keyset(Keyset {
 /// #              subscribe_key: "demo",
@@ -81,7 +80,7 @@ use crate::{
 /// #          })
 /// #         .with_user_id("uuid")
 /// #         .build()?;
-/// let channels = client.channels(&["my_channel_1", "my_channel_2"]);
+/// let channels = pubnub.channels(&["my_channel_1", "my_channel_2"]);
 /// // Two `Subscription` instances can be added to create `SubscriptionSet` which can be used
 /// // to attach listeners and subscribe in one place for both subscriptions used in addition
 /// // operation.
@@ -231,12 +230,11 @@ where
     /// # Example
     ///
     /// ```rust
-    /// use pubnub::{PubNubClient, PubNubClientBuilder, Keyset};
+    /// use pubnub::{subscribe::SubscriptionParams, Keyset, PubNubClient, PubNubClientBuilder};
     ///
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
-    /// use pubnub::subscribe::SubscriptionParams;
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -245,7 +243,7 @@ where
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let subscription = client.subscription(SubscriptionParams {
+    /// let subscription = pubnub.subscription(SubscriptionParams {
     ///     channels: Some(&["my_channel_1", "my_channel_2"]),
     ///     channel_groups: Some(&["my_group"]),
     ///     options: None
@@ -270,6 +268,171 @@ where
         }
     }
 
+    /// Adds a list of subscriptions to the subscription set.
+    ///
+    /// # Arguments
+    ///
+    /// * `subscriptions` - A vector of `Subscription` objects to be added to
+    ///   the subscription set.
+    ///
+    /// # Example
+    ///
+    /// ```rust
+    /// use pubnub::{
+    ///     subscribe::{Subscriber, SubscriptionParams},
+    ///     Keyset, PubNubClient, PubNubClientBuilder,
+    /// };
+    ///
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
+    /// let pubnub = // PubNubClient
+    /// #     PubNubClientBuilder::with_reqwest_transport()
+    /// #         .with_keyset(Keyset {
+    /// #              subscribe_key: "demo",
+    /// #              publish_key: Some("demo"),
+    /// #              secret_key: Some("demo")
+    /// #          })
+    /// #         .with_user_id("uuid")
+    /// #         .build()?;
+    /// // Create subscription set for list of channels and groups.
+    /// let mut subscription = pubnub.subscription(SubscriptionParams {
+    ///     channels: Some(&["my_channel_1", "my_channel_2"]),
+    ///     channel_groups: Some(&["my_group"]),
+    ///     options: None
+    /// });
+    /// let channel = pubnub.channel("my_channel_3");
+    /// // Creating Subscription instance for the Channel entity to subscribe and listen
+    /// // for real-time events.
+    /// let channel_subscription = channel.subscription(None);
+    /// // It is possible to separately add listeners to the `channel_subscription`
+    /// // and call `subscribe()` or `subscribe_with_timetoken(..)`.
+    ///
+    /// // Add channel subscription to the set.
+    /// subscription.add_subscriptions(vec![channel_subscription]);
+    /// #     Ok(())
+    /// # }
+    /// ```
+    pub fn add_subscriptions(&mut self, subscriptions: Vec<Subscription<T, D>>) {
+        let unique_subscriptions =
+            { Self::unique_subscriptions_from_list(Some(self), subscriptions) };
+
+        {
+            let mut subscription_input = self.subscription_input.write();
+            *subscription_input += Self::subscription_input_from_list(&unique_subscriptions, true);
+            self.subscriptions
+                .write()
+                .extend(unique_subscriptions.clone());
+        }
+
+        // Check whether subscription change required or not.
+        if !self.is_subscribed() || unique_subscriptions.is_empty() {
+            return;
+        }
+
+        let Some(client) = self.client().upgrade().clone() else {
+            return;
+        };
+
+        if let Some(manager) = client.subscription_manager(true).write().as_mut() {
+            // Mark entities as "in-use" by subscription.
+            unique_subscriptions.iter().for_each(|subscription| {
+                subscription.entity.increase_subscriptions_count();
+            });
+
+            // Notify manager to update its state with new subscriptions.
+            if let Some((_, handler)) = self.clones.read().iter().next() {
+                let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
+                manager.update(&handler, None);
+            }
+        };
+    }
+
+    /// Subtracts the given subscriptions from the subscription set.
+    ///
+    /// # Arguments
+    ///
+    /// * `subscriptions` - A vector of `Subscription` objects to be removed
+    ///   from the subscription set.
+    ///
+    /// # Example
+    ///
+    /// ```rust
+    /// use pubnub::{
+    ///     subscribe::{Subscriber, SubscriptionParams},
+    ///     Keyset, PubNubClient, PubNubClientBuilder,
+    /// };
+    ///
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
+    /// let pubnub = // PubNubClient
+    /// #     PubNubClientBuilder::with_reqwest_transport()
+    /// #         .with_keyset(Keyset {
+    /// #              subscribe_key: "demo",
+    /// #              publish_key: Some("demo"),
+    /// #              secret_key: Some("demo")
+    /// #          })
+    /// #         .with_user_id("uuid")
+    /// #         .build()?;
+    /// // Create subscription set for list of channels and groups.
+    /// let mut subscription = pubnub.subscription(SubscriptionParams {
+    ///     channels: Some(&["my_channel_1", "my_channel_2"]),
+    ///     channel_groups: Some(&["my_group"]),
+    ///     options: None
+    /// });
+    /// let channel = pubnub.channel("my_channel_3");
+    /// // Creating Subscription instance for the Channel entity to subscribe and listen
+    /// // for real-time events.
+    /// let channel_subscription = channel.subscription(None);
+    /// // It is possible to separately add listeners to the `channel_subscription`
+    /// // and call `subscribe()` or `subscribe_with_timetoken(..)`.
+    ///
+    /// // Add channel subscription to the set.
+    /// subscription.add_subscriptions(vec![channel_subscription.clone()]);
+    ///
+    /// // After some time it needed to remove subscriptions from the set.
+    /// subscription.sub_subscriptions(vec![channel_subscription]);
+    /// #     Ok(())
+    /// # }
+    /// ```
+    pub fn sub_subscriptions(&mut self, subscriptions: Vec<Subscription<T, D>>) {
+        let removed: Vec<Subscription<T, D>> = {
+            let subscriptions_slot = self.subscriptions.read();
+            Self::unique_subscriptions_from_list(None, subscriptions)
+                .into_iter()
+                .filter(|subscription| subscriptions_slot.contains(subscription))
+                .collect()
+        };
+
+        {
+            let mut subscription_input = self.subscription_input.write();
+            *subscription_input -= Self::subscription_input_from_list(&removed, true);
+            let mut subscription_slot = self.subscriptions.write();
+            subscription_slot.retain(|subscription| !removed.contains(subscription));
+        }
+
+        // Check whether subscription change required or not.
+        if !self.is_subscribed() || removed.is_empty() {
+            return;
+        }
+
+        let Some(client) = self.client().upgrade().clone() else {
+            return;
+        };
+
+        // Mark entities as "not in-use" by subscription.
+        removed.iter().for_each(|subscription| {
+            subscription.entity.decrease_subscriptions_count();
+        });
+
+        if let Some(manager) = client.subscription_manager(true).write().as_mut() {
+            // Notify manager to update its state with removed subscriptions.
+            if let Some((_, handler)) = self.clones.read().iter().next() {
+                let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
+                manager.update(&handler, Some(&removed));
+            }
+        };
+    }
+
     /// Aggregate subscriptions' input.
     ///
     /// # Arguments
@@ -447,40 +610,7 @@ where
     D: Deserializer + Send + Sync,
 {
     fn add_assign(&mut self, rhs: Self) {
-        let unique_subscriptions = {
-            let other_subscriptions = rhs.subscriptions.read();
-            SubscriptionSet::unique_subscriptions_from_list(Some(self), other_subscriptions.clone())
-        };
-
-        {
-            let mut subscription_input = self.subscription_input.write();
-            *subscription_input += Self::subscription_input_from_list(&unique_subscriptions, true);
-            self.subscriptions
-                .write()
-                .extend(unique_subscriptions.clone());
-        }
-
-        // Check whether subscription change required or not.
-        if !self.is_subscribed() || unique_subscriptions.is_empty() {
-            return;
-        }
-
-        let Some(client) = self.client().upgrade().clone() else {
-            return;
-        };
-
-        if let Some(manager) = client.subscription_manager(true).write().as_mut() {
-            // Mark entities as "in-use" by subscription.
-            unique_subscriptions.iter().for_each(|subscription| {
-                subscription.entity.increase_subscriptions_count();
-            });
-
-            // Notify manager to update its state with new subscriptions.
-            if let Some((_, handler)) = self.clones.read().iter().next() {
-                let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
-                manager.update(&handler, None);
-            }
-        };
+        self.add_subscriptions(rhs.subscriptions.read().clone());
     }
 }
 impl<T, D> Sub for SubscriptionSet<T, D>
@@ -511,43 +641,7 @@ where
     D: Deserializer + Send + Sync,
 {
     fn sub_assign(&mut self, rhs: Self) {
-        let removed: Vec<Subscription<T, D>> = {
-            let other_subscriptions = rhs.subscriptions.read();
-            let subscriptions_slot = self.subscriptions.read();
-            Self::unique_subscriptions_from_list(None, other_subscriptions.clone())
-                .into_iter()
-                .filter(|subscription| subscriptions_slot.contains(subscription))
-                .collect()
-        };
-
-        {
-            let mut subscription_input = self.subscription_input.write();
-            *subscription_input -= Self::subscription_input_from_list(&removed, true);
-            let mut subscription_slot = self.subscriptions.write();
-            subscription_slot.retain(|subscription| !removed.contains(subscription));
-        }
-
-        // Check whether subscription change required or not.
-        if !self.is_subscribed() || removed.is_empty() {
-            return;
-        }
-
-        let Some(client) = self.client().upgrade().clone() else {
-            return;
-        };
-
-        // Mark entities as "not in-use" by subscription.
-        removed.iter().for_each(|subscription| {
-            subscription.entity.decrease_subscriptions_count();
-        });
-
-        if let Some(manager) = client.subscription_manager(true).write().as_mut() {
-            // Notify manager to update its state with removed subscriptions.
-            if let Some((_, handler)) = self.clones.read().iter().next() {
-                let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
-                manager.update(&handler, Some(&removed));
-            }
-        };
+        self.sub_subscriptions(rhs.subscriptions.read().clone());
     }
 }
 
@@ -633,7 +727,7 @@ where
     /// # #[tokio::main]
     /// # async fn main() -> Result<(), pubnub::core::PubNubError> {
     /// use pubnub::subscribe::SubscriptionParams;
-    /// let client = // PubNubClient
+    /// let pubnub = // PubNubClient
     /// #     PubNubClientBuilder::with_reqwest_transport()
     /// #         .with_keyset(Keyset {
     /// #              subscribe_key: "demo",
@@ -642,7 +736,7 @@ where
     /// #          })
     /// #         .with_user_id("uuid")
     /// #         .build()?;
-    /// let subscription = client.subscription(SubscriptionParams {
+    /// let subscription = pubnub.subscription(SubscriptionParams {
     ///     channels: Some(&["my_channel_1", "my_channel_2"]),
     ///     channel_groups: Some(&["my_group"]),
     ///     options: None
@@ -694,6 +788,32 @@ where
         *self.is_subscribed.read()
     }
 
+    /// Register `Subscription` within `SubscriptionManager`.
+    ///
+    /// # Arguments
+    ///
+    /// - `cursor` - Subscription real-time events catch up cursor.
+    fn register_with_cursor(&self, cursor: Option<SubscriptionCursor>) {
+        let Some(client) = self.client().upgrade().clone() else {
+            return;
+        };
+
+        {
+            let manager = client.subscription_manager(true);
+            if let Some(manager) = manager.write().as_mut() {
+                // Mark entities as "in-use" by subscription.
+                self.subscriptions.read().iter().for_each(|subscription| {
+                    subscription.entity.increase_subscriptions_count();
+                });
+
+                if let Some((_, handler)) = self.clones.read().iter().next() {
+                    let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
+                    manager.register(&handler, cursor);
+                }
+            };
+        }
+    }
+
     /// Filters the given list of `Update` events based on the subscription
     /// input and the current timetoken.
     ///
@@ -751,43 +871,44 @@ where
     T: Transport + Send + Sync + 'static,
     D: Deserializer + Send + Sync + 'static,
 {
-    fn subscribe(&self, cursor: Option<SubscriptionCursor>) {
+    fn subscribe(&self) {
         let mut is_subscribed = self.is_subscribed.write();
         if *is_subscribed {
             return;
         }
         *is_subscribed = true;
 
-        if cursor.is_some() {
-            let mut cursor_slot = self.cursor.write();
-            if let Some(current_cursor) = cursor_slot.as_ref() {
-                let catchup_cursor = cursor.clone().unwrap_or_default();
-                catchup_cursor
-                    .gt(current_cursor)
-                    .then(|| *cursor_slot = Some(catchup_cursor));
-            } else {
-                *cursor_slot = cursor.clone();
-            }
-        }
+        self.register_with_cursor(self.cursor.read().clone())
+    }
 
-        let Some(client) = self.client().upgrade().clone() else {
+    fn subscribe_with_timetoken<SC>(&self, cursor: SC)
+    where
+        SC: Into<SubscriptionCursor>,
+    {
+        let mut is_subscribed = self.is_subscribed.write();
+        if *is_subscribed {
             return;
-        };
+        }
+        *is_subscribed = true;
 
-        {
-            let manager = client.subscription_manager(true);
-            if let Some(manager) = manager.write().as_mut() {
-                // Mark entities as "in-use" by subscription.
-                self.subscriptions.read().iter().for_each(|subscription| {
-                    subscription.entity.increase_subscriptions_count();
-                });
+        let user_cursor = cursor.into();
+        let cursor = user_cursor.is_valid().then_some(user_cursor);
 
-                if let Some((_, handler)) = self.clones.read().iter().next() {
-                    let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
-                    manager.register(&handler, cursor);
+        {
+            if cursor.is_some() {
+                let mut cursor_slot = self.cursor.write();
+                if let Some(current_cursor) = cursor_slot.as_ref() {
+                    let catchup_cursor = cursor.clone().unwrap_or_default();
+                    catchup_cursor
+                        .gt(current_cursor)
+                        .then(|| *cursor_slot = Some(catchup_cursor));
+                } else {
+                    *cursor_slot = cursor.clone();
                 }
-            };
+            }
         }
+
+        self.register_with_cursor(cursor);
     }
 
     fn unsubscribe(&self) {
@@ -797,6 +918,9 @@ where
                 return;
             }
             *is_subscribed_slot = false;
+
+            let mut cursor = self.cursor.write();
+            *cursor = None;
         }
 
         let Some(client) = self.client().upgrade().clone() else {
diff --git a/src/dx/subscribe/traits/event_subscribe.rs b/src/dx/subscribe/traits/event_subscribe.rs
index 9778fee4..2f01b7a1 100644
--- a/src/dx/subscribe/traits/event_subscribe.rs
+++ b/src/dx/subscribe/traits/event_subscribe.rs
@@ -11,8 +11,21 @@ use crate::subscribe::SubscriptionCursor;
 /// Types that implement this trait can change activity of real-time events
 /// processing for specific or set of entities.
 pub trait EventSubscriber {
-    /// Use receiver to subscribe for real-time updates.
-    fn subscribe(&self, cursor: Option<SubscriptionCursor>);
+    /// Use the receiver to subscribe for real-time updates.
+    fn subscribe(&self);
+
+    /// Use the receiver to subscribe for real-time updates starting at a
+    /// specific time.
+    ///
+    /// # Arguments
+    ///
+    /// - `cursor` - `SubscriptionCursor` from which, the client should try to
+    ///   catch up on real-time events. `cursor` also can be provided as `usize`
+    ///   or `String` with a 17-digit PubNub  timetoken. If `cursor` doesn't
+    ///   satisfy the requirements, it will be ignored.
+    fn subscribe_with_timetoken<SC>(&self, cursor: SC)
+    where
+        SC: Into<SubscriptionCursor>;
 
     /// Use receiver to stop receiving real-time updates.
     fn unsubscribe(&self);
diff --git a/src/dx/subscribe/types.rs b/src/dx/subscribe/types.rs
index 23cee281..a44a8a6a 100644
--- a/src/dx/subscribe/types.rs
+++ b/src/dx/subscribe/types.rs
@@ -90,7 +90,7 @@ pub enum SubscriptionOptions {
     /// Whether presence events should be received.
     ///
     /// Whether presence updates for `userId` should be delivered through
-    /// [`Subscription2`] listener streams or not.
+    /// [`Subscription`] and [`SubscriptionSet`] listener streams or not.
     ReceivePresenceEvents,
 }
 
@@ -128,52 +128,6 @@ pub struct SubscriptionCursor {
     pub region: u32,
 }
 
-impl PartialOrd for SubscriptionCursor {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        Some(self.cmp(other))
-    }
-
-    fn lt(&self, other: &Self) -> bool {
-        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
-        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
-        lhs < rhs
-    }
-
-    fn le(&self, other: &Self) -> bool {
-        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
-        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
-        lhs <= rhs
-    }
-
-    fn gt(&self, other: &Self) -> bool {
-        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
-        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
-        lhs > rhs
-    }
-
-    fn ge(&self, other: &Self) -> bool {
-        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
-        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
-        lhs >= rhs
-    }
-}
-
-impl Ord for SubscriptionCursor {
-    fn cmp(&self, other: &Self) -> Ordering {
-        self.partial_cmp(other).unwrap_or(Ordering::Equal)
-    }
-}
-
-impl Debug for SubscriptionCursor {
-    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
-        write!(
-            f,
-            "SubscriptionCursor {{ timetoken: {}, region: {} }}",
-            self.timetoken, self.region
-        )
-    }
-}
-
 /// Subscription statuses.
 #[derive(Clone, PartialEq)]
 pub enum ConnectionStatus {
@@ -192,6 +146,20 @@ pub enum ConnectionStatus {
 
     /// Unexpected disconnection.
     DisconnectedUnexpectedly(PubNubError),
+
+    /// List of channels and groups changed in subscription.
+    SubscriptionChanged {
+        /// List of channels used in subscription.
+        ///
+        /// Channels can be:
+        /// - regular channels
+        /// - channel metadata `id`s
+        /// - user metadata `id`s
+        channels: Option<Vec<String>>,
+
+        /// List of channel groups used in subscription.
+        channel_groups: Option<Vec<String>>,
+    },
 }
 
 /// Presence update information.
@@ -602,6 +570,21 @@ pub enum MessageActionEvent {
     Delete,
 }
 
+impl SubscriptionCursor {
+    /// Checks if the `timetoken` is valid.
+    ///
+    /// A valid `timetoken` should have a length of 17 and contain only numeric
+    /// characters.
+    ///
+    /// # Returns
+    ///
+    /// Returns `true` if the `timetoken` is valid, otherwise `false`.
+    #[cfg(feature = "std")]
+    pub(crate) fn is_valid(&self) -> bool {
+        self.timetoken.len() == 17 && self.timetoken.chars().all(char::is_numeric)
+    }
+}
+
 impl Default for SubscriptionCursor {
     fn default() -> Self {
         Self {
@@ -611,10 +594,103 @@ impl Default for SubscriptionCursor {
     }
 }
 
+impl PartialOrd for SubscriptionCursor {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+
+    fn lt(&self, other: &Self) -> bool {
+        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
+        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
+        lhs < rhs
+    }
+
+    fn le(&self, other: &Self) -> bool {
+        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
+        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
+        lhs <= rhs
+    }
+
+    fn gt(&self, other: &Self) -> bool {
+        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
+        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
+        lhs > rhs
+    }
+
+    fn ge(&self, other: &Self) -> bool {
+        let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
+        let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
+        lhs >= rhs
+    }
+}
+
+impl Ord for SubscriptionCursor {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.partial_cmp(other).unwrap_or(Ordering::Equal)
+    }
+}
+
+impl Debug for SubscriptionCursor {
+    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
+        write!(
+            f,
+            "SubscriptionCursor {{ timetoken: {}, region: {} }}",
+            self.timetoken, self.region
+        )
+    }
+}
+
 impl From<String> for SubscriptionCursor {
     fn from(value: String) -> Self {
-        Self {
-            timetoken: value,
+        let mut timetoken = value;
+        if timetoken.len() != 17 || !timetoken.chars().all(char::is_numeric) {
+            timetoken = "-1".into();
+        }
+
+        SubscriptionCursor {
+            timetoken,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<&str> for SubscriptionCursor {
+    fn from(value: &str) -> Self {
+        let mut timetoken = value;
+        if timetoken.len() != 17 || !timetoken.chars().all(char::is_numeric) {
+            timetoken = "-1";
+        }
+
+        SubscriptionCursor {
+            timetoken: timetoken.to_string(),
+            ..Default::default()
+        }
+    }
+}
+
+impl From<usize> for SubscriptionCursor {
+    fn from(value: usize) -> Self {
+        let mut timetoken = value.to_string();
+        if timetoken.len() != 17 {
+            timetoken = "-1".into();
+        }
+
+        SubscriptionCursor {
+            timetoken,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<u64> for SubscriptionCursor {
+    fn from(value: u64) -> Self {
+        let mut timetoken = value.to_string();
+        if timetoken.len() != 17 {
+            timetoken = "-1".into();
+        }
+
+        SubscriptionCursor {
+            timetoken,
             ..Default::default()
         }
     }
@@ -671,6 +747,16 @@ impl Debug for ConnectionStatus {
             ConnectionStatus::DisconnectedUnexpectedly(err) => {
                 write!(f, "DisconnectedUnexpectedly({err:?})")
             }
+            Self::SubscriptionChanged {
+                channels,
+                channel_groups,
+            } => {
+                write!(
+                    f,
+                    "SubscriptionChanged {{ channels: {channels:?}, \
+                    channel_groups: {channel_groups:?}  }}"
+                )
+            }
         }
     }
 }
@@ -1111,4 +1197,88 @@ mod should {
     fn resolve_subscription_field_value(subscription: Option<String>, channel: &str) -> String {
         resolve_subscription_value(subscription, channel)
     }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_valid_subscription_cursor_as_struct() {
+        let cursor = SubscriptionCursor {
+            timetoken: "12345678901234567".into(),
+            region: 0,
+        };
+        assert!(cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_valid_subscription_cursor_from_string() {
+        let cursor: SubscriptionCursor = "12345678901234567".to_string().into();
+        assert!(cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_valid_subscription_cursor_from_string_slice() {
+        let cursor: SubscriptionCursor = "12345678901234567".into();
+        assert!(cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_valid_subscription_cursor_from_usize() {
+        let timetoken: usize = 12345678901234567;
+        let cursor: SubscriptionCursor = timetoken.into();
+        assert!(cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_valid_subscription_cursor_from_u64() {
+        let timetoken: u64 = 12345678901234567;
+        let cursor: SubscriptionCursor = timetoken.into();
+        assert!(cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_invalid_subscription_cursor_from_short_string() {
+        let cursor: SubscriptionCursor = "1234567890123467".to_string().into();
+        assert!(!cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_invalid_subscription_cursor_from_non_numeric_string() {
+        let cursor: SubscriptionCursor = "123456789a123467".to_string().into();
+        assert!(!cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_invalid_subscription_cursor_from_short_string_slice() {
+        let cursor: SubscriptionCursor = "1234567890123567".into();
+        assert!(!cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_invalid_subscription_cursor_from_non_numeric_string_slice() {
+        let cursor: SubscriptionCursor = "1234567890123a567".into();
+        assert!(!cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_invalid_subscription_cursor_from_too_small_usize() {
+        let timetoken: usize = 123456789012567;
+        let cursor: SubscriptionCursor = timetoken.into();
+        assert!(!cursor.is_valid())
+    }
+
+    #[test]
+    #[cfg(feature = "std")]
+    fn create_invalid_subscription_cursor_from_too_small_u64() {
+        let timetoken: u64 = 123901234567;
+        let cursor: SubscriptionCursor = timetoken.into();
+        assert!(!cursor.is_valid())
+    }
 }
diff --git a/src/lib.rs b/src/lib.rs
index 0e86640b..0b9a37f7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -39,11 +39,11 @@
 //! ```toml
 //! # default features
 //! [dependencies]
-//! pubnub = "0.5.0"
+//! pubnub = "0.6.0"
 //!
 //! # all features
 //! [dependencies]
-//! pubnub = { version = "0.5.0", features = ["full"] }
+//! pubnub = { version = "0.6.0", features = ["full"] }
 //! ```
 //!
 //! ### Example
@@ -51,17 +51,20 @@
 //! Try the following sample code to get up and running quickly!
 //!
 //! ```no_run
-//! use pubnub::{Keyset, PubNubClientBuilder};
-//! use pubnub::dx::subscribe::{SubscribeStreamEvent, Update};
+//! use pubnub::subscribe::Subscriber;
 //! use futures::StreamExt;
 //! use tokio::time::sleep;
 //! use std::time::Duration;
 //! use serde_json;
-//!
+//! use pubnub::{
+//!     dx::subscribe::Update,
+//!     subscribe::EventSubscriber,
+//!     Keyset, PubNubClientBuilder,
+//! };
 //! #[tokio::main]
 //! async fn main() -> Result<(), Box<dyn std::error::Error>> {
 //!     use pubnub::subscribe::{EventEmitter, SubscriptionParams};
-//! let publish_key = "my_publish_key";
+//!     let publish_key = "my_publish_key";
 //!     let subscribe_key = "my_subscribe_key";
 //!     let client = PubNubClientBuilder::with_reqwest_transport()
 //!         .with_keyset(Keyset {
@@ -71,15 +74,22 @@
 //!         })
 //!         .with_user_id("user_id")
 //!         .build()?;
+//!
 //!     println!("PubNub instance created");
-//!    
+//!
 //!     let subscription = client.subscription(SubscriptionParams {
 //!         channels: Some(&["my_channel"]),
 //!         channel_groups: None,
 //!         options: None
 //!     });
 //!
-//!     println!("Subscribed to channel");
+//!     let channel_entity = client.channel("my_channel_2");
+//!     let channel_entity_subscription = channel_entity.subscription(None);
+//!
+//!     subscription.subscribe();
+//!     channel_entity_subscription.subscribe();
+//!
+//!     println!("Subscribed to channels");
 //!
 //!     // Launch a new task to print out each received message
 //!     tokio::spawn(client.status_stream().for_each(|status| async move {
@@ -110,7 +120,21 @@
 //!         }
 //!     }));
 //!
-//!     sleep(Duration::from_secs(1)).await;
+//!     // Explicitly listen only for real-time `message` updates.
+//!     tokio::spawn(
+//!         channel_entity_subscription
+//!             .messages_stream()
+//!             .for_each(|message| async move {
+//!                 if let Ok(utf8_message) = String::from_utf8(message.data.clone()) {
+//!                     if let Ok(cleaned) = serde_json::from_str::<String>(&utf8_message) {
+//!                         println!("message: {}", cleaned);
+//!                     }
+//!                 }
+//!             }),
+//!     );
+//!
+//!    sleep(Duration::from_secs(2)).await;
+//!    
 //!     // Send a message to the channel
 //!     client
 //!         .publish_message("hello world!")
@@ -119,7 +143,15 @@
 //!         .execute()
 //!         .await?;
 //!
-//!     sleep(Duration::from_secs(10)).await;
+//!    // Send a message to another channel
+//!     client
+//!         .publish_message("hello world on the other channel!")
+//!         .channel("my_channel_2")
+//!         .r#type("text-message")
+//!         .execute()
+//!         .await?;
+//!
+//!     sleep(Duration::from_secs(15)).await;
 //!
 //!     Ok(())
 //! }
@@ -135,11 +167,11 @@
 //! ```toml
 //! # only blocking and access + default features
 //! [dependencies]
-//! pubnub = { version = "0.5.0", features = ["blocking", "access"] }
+//! pubnub = { version = "0.6.0", features = ["blocking", "access"] }
 //!
 //! # only parse_token + default features
 //! [dependencies]
-//! pubnub = { version = "0.5.0", features = ["parse_token"] }
+//! pubnub = { version = "0.6.0", features = ["parse_token"] }
 //! ```
 //!
 //! ### Available features
@@ -178,7 +210,7 @@
 //!
 //! ```toml
 //! [dependencies]
-//! pubnub = { version = "0.5.0", default-features = false, features = ["serde", "publish",
+//! pubnub = { version = "0.6.0", default-features = false, features = ["serde", "publish",
 //! "blocking"] }
 //! ```
 //!
diff --git a/tests/presence/presence_steps.rs b/tests/presence/presence_steps.rs
index 51a6fa41..ad8e7404 100644
--- a/tests/presence/presence_steps.rs
+++ b/tests/presence/presence_steps.rs
@@ -138,7 +138,7 @@ async fn join(
         subscriptions.values().cloned().collect(),
         options.clone(),
     );
-    subscription.subscribe(None);
+    subscription.subscribe();
     world.subscription = Some(subscription);
     world.subscriptions = Some(subscriptions);
 }
diff --git a/tests/subscribe/subscribe_steps.rs b/tests/subscribe/subscribe_steps.rs
index 529c39eb..b5d05aca 100644
--- a/tests/subscribe/subscribe_steps.rs
+++ b/tests/subscribe/subscribe_steps.rs
@@ -4,7 +4,7 @@ use cucumber::gherkin::Table;
 use cucumber::{codegen::Regex, gherkin::Step, then, when};
 use futures::{select_biased, FutureExt, StreamExt};
 use pubnub::core::RequestRetryConfiguration;
-use pubnub::subscribe::{EventEmitter, EventSubscriber, SubscriptionParams};
+use pubnub::subscribe::{EventEmitter, EventSubscriber, SubscriptionCursor, SubscriptionParams};
 use std::fs::read_to_string;
 
 /// Extract list of events and invocations from log.
@@ -122,7 +122,7 @@ async fn subscribe(world: &mut PubNubWorld) {
             channel_groups: None,
             options: None,
         });
-        subscription.subscribe(None);
+        subscription.subscribe();
         world.subscription = Some(subscription);
     });
 }
@@ -139,7 +139,11 @@ async fn subscribe_with_timetoken(world: &mut PubNubWorld, timetoken: u64) {
             channel_groups: None,
             options: None,
         });
-        subscription.subscribe(Some(timetoken.to_string().into()));
+
+        subscription.subscribe_with_timetoken(SubscriptionCursor {
+            timetoken: timetoken.to_string(),
+            ..Default::default()
+        });
         world.subscription = Some(subscription);
     });
 }