diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs index 8762e6c9..14eb52ee 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs @@ -19,7 +19,7 @@ use { rpc::{decode_key, AuthMessage, JsonRpcRequest, JsonRpcResponse, JsonRpcResponseError}, services::public_http_server::handlers::relay_webhook::{ error::{RelayMessageClientError, RelayMessageError, RelayMessageServerError}, - handlers::decrypt_message, + handlers::{decrypt_message, notify_watch_subscriptions::send_to_subscription_watchers}, RelayIncomingMessage, }, spec::{ @@ -214,26 +214,46 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R .map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?, }; - let envelope = Envelope::::new(&sym_key, response) - .map_err(RelayMessageServerError::EnvelopeEncryption)?; + let response_fut = async { + let envelope = Envelope::::new(&sym_key, response) + .map_err(RelayMessageServerError::EnvelopeEncryption)?; + let response = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); + publish_relay_message( + &state.relay_client, + &Publish { + topic: msg.topic.clone(), + message: response.into(), + tag: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TAG, + ttl_secs: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL.as_secs() as u32, + prompt: false, + }, + Some(Arc::new(msg)), + state.metrics.as_ref(), + &state.analytics, + ) + .await + .map_err(|e| RelayMessageServerError::NotifyServer(e.into())) // TODO change to client error? + }; - let response = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); + if let Some(watchers_with_subscriptions) = watchers_with_subscriptions { + let watcher_fut = async { + send_to_subscription_watchers( + watchers_with_subscriptions, + &state.notify_keys.authentication_secret, + &state.notify_keys.authentication_client_id, + &state.relay_client, + msg, + state.metrics.as_ref(), + &state.analytics, + ) + .await + .map_err(RelayMessageServerError::SubscriptionWatcherSend) + }; - publish_relay_message( - &state.relay_client, - &Publish { - topic: msg.topic.clone(), - message: response.into(), - tag: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TAG, - ttl_secs: NOTIFY_MARK_NOTIFICATIONS_AS_READ_RESPONSE_TTL.as_secs() as u32, - prompt: false, - }, - Some(Arc::new(msg)), - state.metrics.as_ref(), - &state.analytics, - ) - .await - .map_err(|e| RelayMessageServerError::NotifyServer(e.into()))?; // TODO change to client error? + tokio::try_join!(response_fut, watcher_fut)?; + } else { + response_fut.await?; + } result.map(|_| ()) } diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs index 720898e3..0219327e 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs @@ -189,6 +189,8 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R topic: msg.topic.clone(), }); + // Problem: this doesn't send an update to the watcher that made the change (by design) but + // the request was to send these updates to all watchers for mark_notifications_as_read let (sbs, watchers_with_subscriptions) = prepare_subscription_watchers( &request_iss_client_id, &request_auth.shared_claims.mjv,