Skip to content

Commit

Permalink
Merge branch 'master' into feat/presence-no-std
Browse files Browse the repository at this point in the history
  • Loading branch information
Xavrax authored Aug 30, 2023
2 parents 87fd219 + 3483fb6 commit dfebaf3
Show file tree
Hide file tree
Showing 16 changed files with 447 additions and 78 deletions.
9 changes: 1 addition & 8 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

let subscription = client
.subscribe()
.channels(
[
"my_channel".into(),
"other_channel".into(),
"channel-test-history".into(),
]
.to_vec(),
)
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.heartbeat(10)
.filter_expression("some_filter")
.execute()?;
Expand Down
3 changes: 2 additions & 1 deletion src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ pub enum PubNubError {
details: String,
},

///this error is returned when REST API request can't be handled by service.
///this error is returned when REST API request can't be handled by
/// service.
#[snafu(display("REST API error: {message}"))]
API {
/// Operation status (HTTP) code.
Expand Down
3 changes: 2 additions & 1 deletion src/dx/presence/builders/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ pub struct HeartbeatRequest<T, D> {
/// ```
#[builder(
field(vis = "pub(in crate::dx::presence)"),
setter(custom, strip_option)
setter(custom, strip_option),
default = "None"
)]
pub(in crate::dx::presence) state: Option<Vec<u8>>,

Expand Down
2 changes: 1 addition & 1 deletion src/dx/presence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ where
/// Prepare presence event engine instance which will be used for `user_id`
/// presence announcement and management.
#[cfg(feature = "std")]
pub(crate) fn presence_event_engine(&self) -> Arc<PresenceEventEngine> {
fn presence_event_engine(&self) -> Arc<PresenceEventEngine> {
let channel_bound = 3;
let (cancel_tx, cancel_rx) = async_channel::bounded::<String>(channel_bound);
let delayed_heartbeat_cancel_rx = cancel_rx.clone();
Expand Down
42 changes: 27 additions & 15 deletions src/dx/presence/presence_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! presence / heartbeat module components.
use crate::{
dx::presence::event_engine::{PresenceEvent, PresenceEventEngine},
dx::presence::event_engine::PresenceEventEngine,
lib::{
alloc::{string::String, sync::Arc, vec::Vec},
core::{
Expand Down Expand Up @@ -88,29 +88,41 @@ pub(crate) struct PresenceManagerRef {

impl PresenceManagerRef {
/// Announce `join` for `user_id` on provided channels and groups.
#[allow(dead_code)]
pub(crate) fn announce_join(
&self,
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
) {
self.event_engine.process(&PresenceEvent::Joined {
channels,
channel_groups,
})
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Joined {
// channels,
// channel_groups,
// })
}

/// Announce `leave` for `user_id` on provided channels and groups.
#[allow(dead_code)]
pub(crate) fn announce_left(
&self,
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
_channels: Option<Vec<String>>,
_channel_groups: Option<Vec<String>>,
) {
self.event_engine.process(&PresenceEvent::Left {
channels,
channel_groups,
})
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Left {
// channels,
// channel_groups,
// })
}

/// Announce `leave` while client disconnected.
pub(crate) fn disconnect(&self) {
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Disconnect);
}

/// Announce `join` upon client connection.
pub(crate) fn reconnect(&self) {
// TODO: Uncomment after contract test server fix.
// self.event_engine.process(&PresenceEvent::Reconnect);
}
}

Expand Down
29 changes: 18 additions & 11 deletions src/dx/pubnub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ pub type PubNubGenericClient<T, D> = PubNubClientInstance<PubNubMiddleware<T>, D
/// You must provide a valid [`Keyset`] with pub/sub keys and a string User ID
/// to identify the client.
///
/// To see available methods, please refer to the [`PubNubClientInstance`] documentation.
/// To see available methods, please refer to the [`PubNubClientInstance`]
/// documentation.
///
/// # Examples
/// ```
Expand Down Expand Up @@ -216,7 +217,8 @@ pub type PubNubClient = PubNubGenericClient<TransportReqwest, DeserializerSerde>
/// PubNub client raw instance.
///
/// This struct contains the actual client state.
/// It shouldn't be used directly. Use [`PubNubGenericClient`] or [`PubNubClient`] instead.
/// It shouldn't be used directly. Use [`PubNubGenericClient`] or
/// [`PubNubClient`] instead.
#[derive(Debug)]
pub struct PubNubClientInstance<T, D> {
pub(crate) inner: Arc<PubNubClientRef<T, D>>,
Expand Down Expand Up @@ -592,7 +594,8 @@ pub struct PubNubClientBuilder;
impl PubNubClientBuilder {
/// Set the transport layer for the client.
///
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled
/// `features` following can be set:
/// * runtime environment
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -641,7 +644,8 @@ impl PubNubClientBuilder {

/// Set the transport layer for the client.
///
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
/// `features` following can be set:
/// * [`PubNub API`] response deserializer
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -690,7 +694,8 @@ impl PubNubClientBuilder {

/// Set the blocking transport layer for the client.
///
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled
/// `features` following can be set:
/// * runtime environment
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -742,7 +747,8 @@ impl PubNubClientBuilder {

/// Set the blocking transport layer for the client.
///
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
/// `features` following can be set:
/// * [`PubNub API`] response deserializer
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -795,8 +801,8 @@ impl PubNubClientBuilder {

/// PubNub builder for [`PubNubClient`] to set API keys.
///
/// The builder provides methods to set the [`PubNub API`] keys set and returns the next
/// step of the builder with the remaining parameters.
/// The builder provides methods to set the [`PubNub API`] keys set and returns
/// the next step of the builder with the remaining parameters.
///
/// See [`PubNubClient`] for more information.
///
Expand Down Expand Up @@ -862,7 +868,8 @@ impl<T, D> PubNubClientKeySetBuilder<T, D> {
/// Runtime will be used for detached tasks spawning and delayed task execution.
///
/// Depending from enabled `features` methods may return:
/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`] deserializer
/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`]
/// deserializer
/// * [`PubNubClientKeySetBuilder`] to set API keys set to access [`PubNub API`]
/// * [`PubNubClientUserIdBuilder`] to set user id for the client.
///
Expand All @@ -877,7 +884,8 @@ pub struct PubNubClientRuntimeBuilder<T> {
impl<T> PubNubClientRuntimeBuilder<T> {
/// Set runtime environment.
///
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
/// `features` following can be set:
/// * [`PubNub API`] response deserializer
/// * API ket set to access [`PubNub API`].
///
Expand Down Expand Up @@ -1244,7 +1252,6 @@ where
/// secret_key: Some("sec-c-abc123"),
/// };
/// ```
///
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Keyset<S>
where
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/builders/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
///
/// It should not be created directly, but via [`PubNubClient::subscribe`]
/// and wrapped in [`Subscription`] struct.
#[derive(Debug, Builder)]
#[derive(Builder)]
#[builder(
pattern = "owned",
name = "RawSubscriptionBuilder",
Expand Down
2 changes: 1 addition & 1 deletion src/dx/subscribe/builders/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{core::event_engine::cancel::CancellationTask, lib::alloc::sync::Arc}
/// from the [`PubNub`] network.
///
/// [`PubNub`]:https://www.pubnub.com/
#[derive(Debug, Builder)]
#[derive(Builder)]
#[builder(
pattern = "owned",
build_fn(vis = "pub(in crate::dx::subscribe)", validate = "Self::validate"),
Expand Down
34 changes: 34 additions & 0 deletions src/dx/subscribe/builders/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ pub struct SubscriptionStreamRef<D> {
///
/// Handler used each time when new data available for a stream listener.
waker: RwLock<Option<Waker>>,

/// Whether stream still valid or not.
is_valid: bool,
}

/// Subscription that is responsible for getting messages from PubNub.
Expand Down Expand Up @@ -505,6 +508,27 @@ impl Subscription {
let subscription = &update.subscription();
self.input.contains_channel(subscription) || self.input.contains_channel_group(subscription)
}

/// Invalidate all streams.
pub(crate) fn invalidate(&mut self) {
let mut stream_slot = self.stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;

let mut stream_slot = self.status_stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;

let mut stream_slot = self.updates_stream.write();
if let Some(mut stream) = stream_slot.clone() {
stream.invalidate()
}
*stream_slot = None;
}
}

impl<D> SubscriptionStream<D> {
Expand All @@ -516,10 +540,16 @@ impl<D> SubscriptionStream<D> {
inner: Arc::new(SubscriptionStreamRef {
updates: RwLock::new(stream_updates),
waker: RwLock::new(None),
is_valid: true,
}),
}
}

pub(crate) fn invalidate(&mut self) {
self.is_valid = false;
self.wake_task();
}

fn wake_task(&self) {
if let Some(waker) = self.waker.write().take() {
waker.wake();
Expand All @@ -531,6 +561,10 @@ impl<D> Stream for SubscriptionStream<D> {
type Item = D;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.is_valid {
return Poll::Ready(None);
}

let mut waker_slot = self.waker.write();
*waker_slot = Some(cx.waker().clone());

Expand Down
4 changes: 4 additions & 0 deletions src/dx/subscribe/event_engine/effects/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub(super) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub(super) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand Down
10 changes: 9 additions & 1 deletion src/dx/subscribe/event_engine/effects/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::TryFutureExt;
use log::info;

use crate::{
core::PubNubError,
dx::subscribe::{
event_engine::{
effects::SubscribeEffectExecutor, types::SubscriptionParams, SubscribeEvent,
Expand All @@ -25,6 +26,10 @@ pub(crate) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand All @@ -36,7 +41,10 @@ pub(crate) async fn execute(
.map_ok_or_else(
|error| {
log::error!("Receive error: {:?}", error);
vec![SubscribeEvent::ReceiveFailure { reason: error }]

(!matches!(error, PubNubError::EffectCanceled))
.then(|| vec![SubscribeEvent::ReceiveFailure { reason: error }])
.unwrap_or(vec![])
},
|subscribe_result| {
vec![SubscribeEvent::ReceiveSuccess {
Expand Down
4 changes: 4 additions & 0 deletions src/dx/subscribe/event_engine/effects/receive_reconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub(crate) async fn execute(
input.channel_groups().unwrap_or(Vec::new())
);

if input.is_empty {
return vec![SubscribeEvent::UnsubscribeAll];
}

executor(SubscriptionParams {
channels: &input.channels(),
channel_groups: &input.channel_groups(),
Expand Down
6 changes: 6 additions & 0 deletions src/dx/subscribe/event_engine/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ impl Add for SubscribeInput {
}
}

impl Default for SubscribeInput {
fn default() -> Self {
SubscribeInput::new(&None, &None)
}
}

impl AddAssign for SubscribeInput {
fn add_assign(&mut self, rhs: Self) {
let channel_groups = self.join_sets(&self.channel_groups, &rhs.channel_groups);
Expand Down
Loading

0 comments on commit dfebaf3

Please sign in to comment.