Skip to content

Commit

Permalink
Merge pull request #12 from mbachmann-sts/main
Browse files Browse the repository at this point in the history
implemented subscribing to live messages only
  • Loading branch information
babymotte authored Oct 18, 2023
2 parents c2532c3 + 85452ab commit 2524fb5
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 135 deletions.
16 changes: 9 additions & 7 deletions worterbuch-cli/src/bin/wbpsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ struct Args {
patterns: Option<Vec<String>>,
/// Only receive unique values, i.e. skip notifications when a key is set to a value it already has.
#[arg(short, long)]
unique: Option<bool>,
unique: bool,
/// Only receive live values, i.e. do not receive a callback for the state currently stored on the broker.
#[arg(short, long)]
live_only: bool,
}

#[tokio::main(flavor = "current_thread")]
Expand Down Expand Up @@ -56,7 +59,8 @@ async fn run(subsys: SubsystemHandle) -> Result<()> {
config.port = args.port.unwrap_or(config.port);
let json = args.json;
let patterns = args.patterns;
let unique = args.unique.unwrap_or(false);
let unique = args.unique;
let live_only = args.live_only;

let (disco_tx, mut disco_rx) = mpsc::channel(1);
let on_disconnect = async move {
Expand All @@ -80,11 +84,9 @@ async fn run(subsys: SubsystemHandle) -> Result<()> {
print_message(&msg, json);
},
recv = next_item(&mut rx, done) => match recv {
Some(key ) => if unique {
wb.psubscribe_unique_async(key, Some(Duration::from_millis(1))).await?;
} else {
wb.psubscribe_async(key, Some(Duration::from_millis(1))).await?;
},
Some(key) => {
wb.psubscribe_async(key, unique,live_only, Some(Duration::from_millis(1))).await?;
},
None => done = true,
},
}
Expand Down
16 changes: 9 additions & 7 deletions worterbuch-cli/src/bin/wbsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ struct Args {
keys: Option<Vec<String>>,
/// Only receive unique values, i.e. skip notifications when a key is set to a value it already has.
#[arg(short, long)]
unique: Option<bool>,
unique: bool,
/// Only receive live values, i.e. do not receive a callback for the state currently stored on the broker.
#[arg(short, long)]
live_only: bool,
}

#[tokio::main(flavor = "current_thread")]
Expand Down Expand Up @@ -56,7 +59,8 @@ async fn run(subsys: SubsystemHandle) -> Result<()> {
config.port = args.port.unwrap_or(config.port);
let json = args.json;
let keys = args.keys;
let unique = args.unique.unwrap_or(false);
let unique = args.unique;
let live_only = args.live_only;

let (disco_tx, mut disco_rx) = mpsc::channel(1);
let on_disconnect = async move {
Expand All @@ -80,11 +84,9 @@ async fn run(subsys: SubsystemHandle) -> Result<()> {
print_message(&msg, json);
},
recv = next_item(&mut rx, done) => match recv {
Some(key ) => if unique {
wb.subscribe_unique_async(key).await?;
} else {
wb.subscribe_async(key).await?;
},
Some(key ) => {
wb.subscribe_async(key, unique, live_only).await?;
},
None => done = true,
},
}
Expand Down
148 changes: 53 additions & 95 deletions worterbuch-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,29 @@ pub(crate) enum Command {
UniqueFlag,
oneshot::Sender<TransactionId>,
mpsc::UnboundedSender<Option<Value>>,
LiveOnlyFlag,
),
SubscribeAsync(
Key,
UniqueFlag,
oneshot::Sender<TransactionId>,
LiveOnlyFlag,
),
SubscribeAsync(Key, UniqueFlag, oneshot::Sender<TransactionId>),
PSubscribe(
Key,
UniqueFlag,
oneshot::Sender<TransactionId>,
mpsc::UnboundedSender<PStateEvent>,
Option<u64>,
LiveOnlyFlag,
),
PSubscribeAsync(
Key,
UniqueFlag,
oneshot::Sender<TransactionId>,
Option<u64>,
LiveOnlyFlag,
),
PSubscribeAsync(Key, UniqueFlag, oneshot::Sender<TransactionId>, Option<u64>),
Unsubscribe(TransactionId),
SubscribeLs(
Option<Key>,
Expand Down Expand Up @@ -295,65 +308,42 @@ impl Worterbuch {
Ok(children)
}

pub async fn subscribe_async(&self, key: Key) -> ConnectionResult<TransactionId> {
let (tx, rx) = oneshot::channel();
self.commands
.send(Command::SubscribeAsync(key, false, tx))
.await?;
let tid = rx.await?;
Ok(tid)
}

pub async fn subscribe_generic(
pub async fn subscribe_async(
&self,
key: Key,
) -> ConnectionResult<(mpsc::UnboundedReceiver<Option<Value>>, TransactionId)> {
let (tid_tx, tid_rx) = oneshot::channel();
let (val_tx, val_rx) = mpsc::unbounded_channel();
self.commands
.send(Command::Subscribe(key, false, tid_tx, val_tx))
.await?;
let transaction_id = tid_rx.await?;
Ok((val_rx, transaction_id))
}

pub async fn subscribe<T: DeserializeOwned + Send + 'static>(
&self,
key: Key,
) -> ConnectionResult<(mpsc::UnboundedReceiver<Option<T>>, TransactionId)> {
let (val_rx, transaction_id) = self.subscribe_generic(key).await?;
let (typed_val_tx, typed_val_rx) = mpsc::unbounded_channel();
spawn(deserialize_values(val_rx, typed_val_tx));
Ok((typed_val_rx, transaction_id))
}

pub async fn subscribe_unique_async(&self, key: Key) -> ConnectionResult<TransactionId> {
unique: bool,
live_only: bool,
) -> ConnectionResult<TransactionId> {
let (tx, rx) = oneshot::channel();
self.commands
.send(Command::SubscribeAsync(key, true, tx))
.send(Command::SubscribeAsync(key, unique, tx, live_only))
.await?;
let tid = rx.await?;
Ok(tid)
}

pub async fn subscribe_unique_generic(
pub async fn subscribe_generic(
&self,
key: Key,
unique: bool,
live_only: bool,
) -> ConnectionResult<(mpsc::UnboundedReceiver<Option<Value>>, TransactionId)> {
let (tid_tx, tid_rx) = oneshot::channel();
let (val_tx, val_rx) = mpsc::unbounded_channel();
self.commands
.send(Command::Subscribe(key, true, tid_tx, val_tx))
.send(Command::Subscribe(key, unique, tid_tx, val_tx, live_only))
.await?;
let transaction_id = tid_rx.await?;
Ok((val_rx, transaction_id))
}

pub async fn subscribe_unique<T: DeserializeOwned + Send + 'static>(
pub async fn subscribe<T: DeserializeOwned + Send + 'static>(
&self,
key: Key,
unique: bool,
live_only: bool,
) -> ConnectionResult<(mpsc::UnboundedReceiver<Option<T>>, TransactionId)> {
let (val_rx, transaction_id) = self.subscribe_unique_generic(key).await?;
let (val_rx, transaction_id) = self.subscribe_generic(key, unique, live_only).await?;
let (typed_val_tx, typed_val_rx) = mpsc::unbounded_channel();
spawn(deserialize_values(val_rx, typed_val_tx));
Ok((typed_val_rx, transaction_id))
Expand All @@ -362,15 +352,18 @@ impl Worterbuch {
pub async fn psubscribe_async(
&self,
request_pattern: RequestPattern,
unique: bool,
live_only: bool,
aggregation_duration: Option<Duration>,
) -> ConnectionResult<TransactionId> {
let (tx, rx) = oneshot::channel();
self.commands
.send(Command::PSubscribeAsync(
request_pattern,
false,
unique,
tx,
aggregation_duration.map(|d| d.as_millis() as u64),
live_only,
))
.await?;
let tid = rx.await?;
Expand All @@ -380,17 +373,20 @@ impl Worterbuch {
pub async fn psubscribe_generic(
&self,
request_pattern: RequestPattern,
unique: bool,
live_only: bool,
aggregation_duration: Option<Duration>,
) -> ConnectionResult<(mpsc::UnboundedReceiver<PStateEvent>, TransactionId)> {
let (tid_tx, tid_rx) = oneshot::channel();
let (event_tx, event_rx) = mpsc::unbounded_channel();
self.commands
.send(Command::PSubscribe(
request_pattern,
false,
unique,
tid_tx,
event_tx,
aggregation_duration.map(|d| d.as_millis() as u64),
live_only,
))
.await?;
let transaction_id = tid_rx.await?;
Expand All @@ -400,61 +396,12 @@ impl Worterbuch {
pub async fn psubscribe<T: DeserializeOwned + Send + 'static>(
&self,
request_pattern: RequestPattern,
unique: bool,
live_only: bool,
aggregation_duration: Option<Duration>,
) -> ConnectionResult<(mpsc::UnboundedReceiver<TypedStateEvents<T>>, TransactionId)> {
let (event_rx, transaction_id) = self
.psubscribe_generic(request_pattern, aggregation_duration)
.await?;
let (typed_event_tx, typed_event_rx) = mpsc::unbounded_channel();
spawn(deserialize_events(event_rx, typed_event_tx));
Ok((typed_event_rx, transaction_id))
}

pub async fn psubscribe_unique_async(
&self,
request_pattern: RequestPattern,
aggregation_duration: Option<Duration>,
) -> ConnectionResult<TransactionId> {
let (tx, rx) = oneshot::channel();
self.commands
.send(Command::PSubscribeAsync(
request_pattern,
true,
tx,
aggregation_duration.map(|d| d.as_millis() as u64),
))
.await?;
let tid = rx.await?;
Ok(tid)
}

pub async fn psubscribe_unique_generic(
&self,
request_pattern: RequestPattern,
aggregation_duration: Option<Duration>,
) -> ConnectionResult<(mpsc::UnboundedReceiver<PStateEvent>, TransactionId)> {
let (tid_tx, tid_rx) = oneshot::channel();
let (event_tx, event_rx) = mpsc::unbounded_channel();
self.commands
.send(Command::PSubscribe(
request_pattern,
true,
tid_tx,
event_tx,
aggregation_duration.map(|d| d.as_millis() as u64),
))
.await?;
let transaction_id = tid_rx.await?;
Ok((event_rx, transaction_id))
}

pub async fn psubscribe_unique<T: DeserializeOwned + Send + 'static>(
&self,
request_pattern: RequestPattern,
aggregation_duration: Option<Duration>,
) -> ConnectionResult<(mpsc::UnboundedReceiver<TypedStateEvents<T>>, TransactionId)> {
let (event_rx, transaction_id) = self
.psubscribe_unique_generic(request_pattern, aggregation_duration)
.psubscribe_generic(request_pattern, unique, live_only, aggregation_duration)
.await?;
let (typed_event_tx, typed_event_rx) = mpsc::unbounded_channel();
spawn(deserialize_events(event_rx, typed_event_tx));
Expand Down Expand Up @@ -947,7 +894,7 @@ async fn process_incoming_command(
parent,
}))
}
Command::Subscribe(key, unique, tid_callback, value_callback) => {
Command::Subscribe(key, unique, tid_callback, value_callback, live_only) => {
callbacks.sub.insert(transaction_id, value_callback);
tid_callback
.send(transaction_id)
Expand All @@ -956,14 +903,16 @@ async fn process_incoming_command(
transaction_id,
key,
unique,
live_only: Some(live_only),
}))
}
Command::SubscribeAsync(key, unique, callback) => {
Command::SubscribeAsync(key, unique, callback, live_only) => {
callback.send(transaction_id).expect("error in callback");
Some(CM::Subscribe(Subscribe {
transaction_id,
key,
unique,
live_only: Some(live_only),
}))
}
Command::PSubscribe(
Expand All @@ -972,6 +921,7 @@ async fn process_incoming_command(
tid_callback,
event_callback,
aggregate_events,
live_only,
) => {
callbacks.psub.insert(transaction_id, event_callback);
tid_callback
Expand All @@ -982,15 +932,23 @@ async fn process_incoming_command(
request_pattern,
unique,
aggregate_events,
live_only: Some(live_only),
}))
}
Command::PSubscribeAsync(request_pattern, unique, callback, aggregate_events) => {
Command::PSubscribeAsync(
request_pattern,
unique,
callback,
aggregate_events,
live_only,
) => {
callback.send(transaction_id).expect("error in callback");
Some(CM::PSubscribe(PSubscribe {
transaction_id,
request_pattern,
unique,
aggregate_events,
live_only: Some(live_only),
}))
}
Command::Unsubscribe(transaction_id) => {
Expand Down
1 change: 1 addition & 0 deletions worterbuch-client/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ impl WsClientSocket {

pub async fn send_msg(&mut self, msg: &ClientMessage) -> ConnectionResult<()> {
let json = serde_json::to_string(msg)?;
log::debug!("Sending message: {json}");
let msg = Message::Text(json);
self.websocket.send(msg).await?;
Ok(())
Expand Down
Loading

0 comments on commit 2524fb5

Please sign in to comment.