Skip to content

Commit

Permalink
Presence event engine (#164)
Browse files Browse the repository at this point in the history
feat(presence): add presence event engine

feat(presence): add set state endpoint

Add an endpoint which allows associating `user_id` state with a list of channels and groups.

refactor(deserializer): refactor custom deserializer configuration

Custom deserializer now can be set during client configuration.

refactor(runtime): refactor runtime environment configuration

Runtime environment now can be set during client configuration.

refactor(parse): refactor PubNub endpoint response parsing

---------

Co-authored-by: Xavrax <[email protected]>
  • Loading branch information
parfeon and Xavrax authored Aug 28, 2023
1 parent ffaa5ab commit ea5e18d
Show file tree
Hide file tree
Showing 68 changed files with 7,987 additions and 2,617 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ build = "build.rs"
[features]

# Enables all non-conflicting features
full = ["publish", "access", "serde", "reqwest", "aescbc", "parse_token", "blocking", "std", "tokio"]
full = ["publish", "subscribe", "presence", "access", "serde", "reqwest", "aescbc", "parse_token", "blocking", "std", "tokio"]

# Enables all default features
default = ["publish", "subscribe", "serde", "reqwest", "aescbc", "std", "blocking", "tokio"]
default = ["publish", "subscribe", "presence", "serde", "reqwest", "aescbc", "std", "blocking", "tokio"]

# [PubNub features]

Expand Down Expand Up @@ -70,6 +70,7 @@ pubnub_only = ["aescbc", "parse_token", "blocking", "publish", "access", "subscr
mock_getrandom = ["getrandom/custom"]
# TODO: temporary treated as internal until we officially release it
subscribe = ["dep:futures"]
presence = ["dep:futures"]

[dependencies]
async-trait = "0.1"
Expand Down
9 changes: 8 additions & 1 deletion examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

let subscription = client
.subscribe()
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.channels(
[
"my_channel".into(),
"other_channel".into(),
"channel-test-history".into(),
]
.to_vec(),
)
.heartbeat(10)
.filter_expression("some_filter")
.execute()?;
Expand Down
12 changes: 12 additions & 0 deletions src/core/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,15 @@ pub trait Deserialize<'de>: Send + Sync {
/// Deserialize the value
fn deserialize(bytes: &'de [u8]) -> Result<Self::Type, PubNubError>;
}

#[cfg(not(feature = "serde"))]
impl<'de, D> Deserialize<'de> for D
where
D: Sync + Send,
{
type Type = D;

fn deserialize(_bytes: &'de [u8]) -> Result<Self::Type, PubNubError> {
unimplemented!("Please implement this method for type")
}
}
24 changes: 20 additions & 4 deletions src/core/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::core::PubNubError;
///
/// struct MyDeserializer;
///
/// impl Deserializer<PublishResult> for MyDeserializer {
/// fn deserialize(&self, bytes: &[u8]) -> Result<PublishResult, PubNubError> {
/// impl Deserializer for MyDeserializer {
/// fn deserialize<PublishResult>(&self, bytes: &[u8]) -> Result<PublishResult, PubNubError> {
/// // ...
/// # unimplemented!()
/// }
Expand All @@ -42,7 +42,7 @@ use crate::core::PubNubError;
/// [`PublishResponseBody`]: ../../dx/publish/result/enum.PublishResponseBody.html
/// [`GrantTokenResponseBody`]: ../../dx/access/result/enum.GrantTokenResponseBody.html
/// [`RevokeTokenResponseBody`]: ../../dx/access/result/enum.RevokeTokenResponseBody.html
pub trait Deserializer<T>: Send + Sync {
pub trait Deserializer: Send + Sync {
/// Deserialize a `&Vec<u8>` into a `Result<T, PubNubError>`.
///
/// # Errors
Expand All @@ -51,5 +51,21 @@ pub trait Deserializer<T>: Send + Sync {
/// deserialization fails.
///
/// [`PubNubError::DeserializationError`]: ../enum.PubNubError.html#variant.DeserializationError
fn deserialize(&self, bytes: &[u8]) -> Result<T, PubNubError>;
#[cfg(not(feature = "serde"))]
fn deserialize<T>(&self, bytes: &[u8]) -> Result<T, PubNubError>
where
T: for<'de> crate::core::Deserialize<'de>;

/// Deserialize a `&Vec<u8>` into a `Result<T, PubNubError>`.
///
/// # Errors
///
/// This method should return [`PubNubError::DeserializationError`] if the
/// deserialization fails.
///
/// [`PubNubError::DeserializationError`]: ../enum.PubNubError.html#variant.DeserializationError
#[cfg(feature = "serde")]
fn deserialize<T>(&self, bytes: &[u8]) -> Result<T, PubNubError>
where
T: for<'de> serde::Deserialize<'de>;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Managed effects cancellation module.
//!
//! This module provides [`CancellationTask`] which can be used to cancel
//! managed effects.
use async_channel::Receiver;

use crate::{
Expand All @@ -12,7 +16,7 @@ pub(crate) struct CancellationTask {
}

impl CancellationTask {
pub(super) fn new(cancel_rx: Receiver<String>, id: String) -> Self {
pub fn new(cancel_rx: Receiver<String>, id: String) -> Self {
Self { cancel_rx, id }
}

Expand Down
12 changes: 6 additions & 6 deletions src/core/event_engine/effect_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
log::debug!("Received invocation: {}", invocation.id());

let effect = cloned_self.dispatch(&invocation);
let task_completition = completion.clone();
let task_completion = completion.clone();

if let Some(effect) = effect {
log::debug!("Dispatched effect: {}", effect.id());
Expand All @@ -86,7 +86,7 @@ where
cloned_self.remove_managed_effect(effect.id());
}

task_completition(events);
task_completion(events);
});
}
}
Expand Down Expand Up @@ -260,7 +260,7 @@ mod should {
"Non managed effects shouldn't be stored"
);

assert!(effect.unwrap().id() == "EFFECT_ONE");
assert_eq!(effect.unwrap().id(), "EFFECT_ONE");
}

#[tokio::test]
Expand All @@ -275,22 +275,22 @@ mod should {
"Managed effect should be removed on completion"
);

assert!(effect.unwrap().id() == "EFFECT_TWO");
assert_eq!(effect.unwrap().id(), "EFFECT_TWO");
}

#[test]
fn cancel_managed_effect() {
let (_tx, rx) = async_channel::bounded::<TestInvocation>(5);
let dispatcher = Arc::new(EffectDispatcher::new(TestEffectHandler {}, rx));
dispatcher.dispatch(&TestInvocation::Three);
let cancelation_effect = dispatcher.dispatch(&TestInvocation::CancelThree);
let cancellation_effect = dispatcher.dispatch(&TestInvocation::CancelThree);

assert_eq!(
dispatcher.managed.read().len(),
0,
"Managed effect should be cancelled"
);

assert!(cancelation_effect.is_none());
assert!(cancellation_effect.is_none());
}
}
13 changes: 8 additions & 5 deletions src/core/event_engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Event Engine module
use crate::lib::alloc::sync::Arc;
use async_channel::Sender;
use log::error;
use spin::rwlock::RwLock;

use crate::{core::runtime::Runtime, lib::alloc::sync::Arc};

#[doc(inline)]
pub(crate) use effect::Effect;
pub(crate) mod effect;
Expand All @@ -29,12 +30,14 @@ pub(crate) mod event;
pub(crate) use state::State;
pub(crate) mod state;

use crate::core::runtime::Runtime;
#[doc(inline)]
pub(crate) use transition::Transition;

pub(crate) mod transition;

#[doc(inline)]
pub(crate) use cancel::CancellationTask;
pub(crate) mod cancel;

/// State machine's event engine.
///
/// [`EventEngine`] is the core of state machines used in PubNub client and
Expand Down Expand Up @@ -214,8 +217,8 @@ mod should {
.exit()
.unwrap_or(vec![])
.into_iter()
.chain(invocations.unwrap_or(vec![]).into_iter())
.chain(state.enter().unwrap_or(vec![]).into_iter())
.chain(invocations.unwrap_or(vec![]))
.chain(state.enter().unwrap_or(vec![]))
.collect(),
state,
}
Expand Down
12 changes: 7 additions & 5 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
pub use error::PubNubError;
pub mod error;

#[cfg(any(feature = "publish", feature = "access"))]
#[doc(inline)]
pub(crate) use error_response::APIErrorBody;
#[cfg(any(feature = "publish", feature = "access"))]
pub(crate) mod error_response;
#[cfg(any(
feature = "publish",
feature = "access",
feature = "subscribe",
feature = "presence"
))]
pub(crate) mod service_response;

#[cfg(feature = "blocking")]
#[doc(inline)]
Expand Down
57 changes: 56 additions & 1 deletion src/core/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
//! The [`Spawner`] trait is used to spawn async tasks in work of the PubNub
//! client.
use crate::lib::{alloc::boxed::Box, core::future::Future};
use crate::lib::{
alloc::{
fmt::{Debug, Formatter, Result},
sync::Arc,
},
core::future::Future,
};
use futures::future::{BoxFuture, FutureExt};

/// PubNub spawner trait.
///
Expand Down Expand Up @@ -45,3 +52,51 @@ pub trait Runtime: Clone + Send {
/// Sleep current task for specified amount of time (in seconds).
async fn sleep(self, delay: u64);
}

#[derive(Clone)]
pub(crate) struct RuntimeSupport {
spawner: Arc<dyn Fn(BoxFuture<'static, ()>) + Send + Sync>,
sleeper: Arc<dyn Fn(u64) -> BoxFuture<'static, ()> + Send + Sync>,
}

impl RuntimeSupport {
pub fn new<R>(runtime: Arc<R>) -> Self
where
R: Runtime + Copy + Send + Sync + 'static,
{
let spawn_runtime = runtime.clone();
let sleep_runtime = runtime.clone();

Self {
sleeper: Arc::new(move |delay| sleep_runtime.sleep(delay).boxed()),
spawner: Arc::new(Box::new(move |future| {
spawn_runtime.spawn(future);
})),
}
}
}

#[async_trait::async_trait]
impl Runtime for RuntimeSupport {
fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static)
where
R: Send + 'static,
{
(self.spawner.clone())(
async move {
future.await;
}
.boxed(),
);
}

async fn sleep(self, delay: u64) {
(self.sleeper)(delay).await
}
}

impl Debug for RuntimeSupport {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(f, "RuntimeSupport {{}}")
}
}
Loading

0 comments on commit ea5e18d

Please sign in to comment.