Skip to content

Commit

Permalink
fix: mark notifications as read should notify subscription watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed Oct 9, 2024
1 parent 711959e commit a982713
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
rpc::{decode_key, AuthMessage, JsonRpcRequest, JsonRpcResponse, JsonRpcResponseError},
services::public_http_server::handlers::relay_webhook::{
error::{RelayMessageClientError, RelayMessageError, RelayMessageServerError},
handlers::decrypt_message,
handlers::{decrypt_message, notify_watch_subscriptions::send_to_subscription_watchers},
RelayIncomingMessage,
},
spec::{
Expand Down Expand Up @@ -214,26 +214,46 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?,
};

let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;
let response_fut = async {
let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;
let response = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes());
publish_relay_message(
&state.relay_client,
&Publish {
topic: msg.topic.clone(),
message: response.into(),
tag: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TAG,
ttl_secs: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL.as_secs() as u32,
prompt: false,
},
Some(Arc::new(msg)),
state.metrics.as_ref(),
&state.analytics,
)
.await
.map_err(|e| RelayMessageServerError::NotifyServer(e.into())) // TODO change to client error?
};

let response = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes());
if let Some(watchers_with_subscriptions) = watchers_with_subscriptions {
let watcher_fut = async {
send_to_subscription_watchers(
watchers_with_subscriptions,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_client,
msg,
state.metrics.as_ref(),
&state.analytics,
)
.await
.map_err(RelayMessageServerError::SubscriptionWatcherSend)
};

publish_relay_message(
&state.relay_client,
&Publish {
topic: msg.topic.clone(),
message: response.into(),
tag: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TAG,
ttl_secs: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL.as_secs() as u32,
prompt: false,
},
Some(Arc::new(msg)),
state.metrics.as_ref(),
&state.analytics,
)
.await
.map_err(|e| RelayMessageServerError::NotifyServer(e.into()))?; // TODO change to client error?
tokio::try_join!(response_fut, watcher_fut)?;
} else {
response_fut.await?;
}

result.map(|_| ())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
topic: msg.topic.clone(),
});

// Problem: this doesn't send an update to the watcher that made the change (by design) but
// the request was to send these updates to all watchers for mark_notifications_as_read
let (sbs, watchers_with_subscriptions) = prepare_subscription_watchers(
&request_iss_client_id,
&request_auth.shared_claims.mjv,
Expand Down

0 comments on commit a982713

Please sign in to comment.