From 087a69e4c42b9ec70f1944fbfa66157effc2b5be Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Mon, 19 Feb 2024 12:36:38 -0500 Subject: [PATCH 1/2] fix: don't retry some relay errors --- Cargo.lock | 27 +++++++++++++-- Cargo.toml | 4 +-- src/error.rs | 13 ++++++- src/publish_relay_message.rs | 34 +++++++++++++++---- .../refresh_topic_subscriptions.rs | 2 ++ .../relay_renewal_job/register_webhook.rs | 8 +++-- 6 files changed, 75 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99631660..89f415b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3682,7 +3682,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "relay_client" version = "0.1.0" -source = "git+https://github.com/WalletConnect/WalletConnectRust.git?tag=v0.26.4#5d8a12a1fe2ae2df51c512e82789fbdeb9e55766" +source = "git+https://github.com/WalletConnect/WalletConnectRust.git?branch=feat/error-overhaul#8be268ddaca2d18cdcc5f388844eff6f54907271" dependencies = [ "chrono", "futures-channel", @@ -3705,7 +3705,7 @@ dependencies = [ [[package]] name = "relay_rpc" version = "0.1.0" -source = "git+https://github.com/WalletConnect/WalletConnectRust.git?tag=v0.26.4#5d8a12a1fe2ae2df51c512e82789fbdeb9e55766" +source = "git+https://github.com/WalletConnect/WalletConnectRust.git?branch=feat/error-overhaul#8be268ddaca2d18cdcc5f388844eff6f54907271" dependencies = [ "alloy-json-abi", "alloy-json-rpc", @@ -3731,6 +3731,7 @@ dependencies = [ "serde_json", "sha2", "sha3", + "strum", "thiserror", "url", ] @@ -4591,6 +4592,28 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "strum" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.38", +] + [[package]] name = "subtle" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index 875b2086..9a979711 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,8 +72,8 @@ futures = "0.3.26" futures-util = "0.3" dashmap = "5.4.0" -relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.26.4", features = ["cacao"] } -relay_client = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.26.4" } +relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", branch = "feat/error-overhaul", features = ["cacao"] } +relay_client = { git = "https://github.com/WalletConnect/WalletConnectRust.git", branch = "feat/error-overhaul" } x25519-dalek = { version = "2.0.0", features = ["static_secrets"] } hkdf = "0.12.3" sha2 = "0.10.6" diff --git a/src/error.rs b/src/error.rs index fc8e0254..4886a709 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,9 +9,11 @@ use { chacha20poly1305::aead, data_encoding::DecodeError, hyper::StatusCode, + relay_client::error::ClientError, relay_rpc::{ auth::{did::DidError, ed25519_dalek::ed25519}, domain::{ClientIdDecodingError, ProjectId, Topic}, + rpc::{PublishError, SubscriptionError, WatchError}, }, serde_json::json, std::{array::TryFromSliceError, string::FromUtf8Error, sync::Arc}, @@ -79,7 +81,16 @@ pub enum NotifyServerError { Base64Decode(#[from] base64::DecodeError), #[error(transparent)] - WalletConnectClient(#[from] relay_client::error::Error), + RelayClientError(#[from] ClientError), + + #[error(transparent)] + RelaySubscribeError(#[from] relay_client::error::Error), + + #[error(transparent)] + RelayPublishError(#[from] relay_client::error::Error), + + #[error(transparent)] + RelayWatchError(#[from] relay_client::error::Error), #[error(transparent)] TryRecvError(#[from] tokio::sync::mpsc::error::TryRecvError), diff --git a/src/publish_relay_message.rs b/src/publish_relay_message.rs index df52f38d..3c975d88 100644 --- a/src/publish_relay_message.rs +++ b/src/publish_relay_message.rs @@ -6,7 +6,7 @@ use { relay_client::{error::Error, http::Client}, relay_rpc::{ domain::Topic, - rpc::{msg_id::get_message_id, Publish}, + rpc::{self, msg_id::get_message_id, Publish, PublishError, SubscriptionError}, }, std::{ sync::{Arc, OnceLock}, @@ -29,7 +29,7 @@ pub async fn publish_relay_message( relay_client: &Client, publish: &Publish, metrics: Option<&Metrics>, -) -> Result<(), Error> { +) -> Result<(), Error> { info!("publish_relay_message"); let start = Instant::now(); @@ -47,7 +47,16 @@ pub async fn publish_relay_message( if let Some(metrics) = metrics { metrics.relay_outgoing_message_publish(publish.tag, start); } - result + match result { + Ok(_) => Ok(()), + Err(e) => match e { + Error::Response(rpc::Error::Handler(PublishError::MailboxLimitExceeded)) => { + info!("Mailbox limit exceeded for topic {}", publish.topic); + Ok(()) + } + e => Err(e), + }, + } }; let mut tries = 0; @@ -88,7 +97,7 @@ pub async fn subscribe_relay_topic( relay_client: &Client, topic: &Topic, metrics: Option<&Metrics>, -) -> Result<(), Error> { +) -> Result<(), Error> { info!("subscribe_relay_topic"); let start = Instant::now(); @@ -98,7 +107,20 @@ pub async fn subscribe_relay_topic( if let Some(metrics) = metrics { metrics.relay_subscribe_request(start); } - result + match result { + Ok(_) => Ok(()), + Err(e) => match e { + Error::Response(rpc::Error::Handler( + SubscriptionError::SubscriberLimitExceeded, + )) => { + // FIXME figure out how to handle this properly; being unable to subscribe means a broken state + // https://walletconnect.slack.com/archives/C058RS0MH38/p1708183383748259 + warn!("Subscriber limit exceeded for topic {topic}"); + Ok(()) + } + e => Err(e), + }, + } }; let mut tries = 0; @@ -141,7 +163,7 @@ pub async fn extend_subscription_ttl( relay_client: &Client, topic: Topic, metrics: Option<&Metrics>, -) -> Result<(), Error> { +) -> Result<(), Error> { info!("extend_subscription_ttl"); // Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime diff --git a/src/services/relay_renewal_job/refresh_topic_subscriptions.rs b/src/services/relay_renewal_job/refresh_topic_subscriptions.rs index 8bf5a3dd..1d0b58ed 100644 --- a/src/services/relay_renewal_job/refresh_topic_subscriptions.rs +++ b/src/services/relay_renewal_job/refresh_topic_subscriptions.rs @@ -71,10 +71,12 @@ pub async fn run( // Subscribe a second time as the initial subscription above may have expired subscribe_relay_topic(client, &topic, metrics) .map_ok(|_| ()) + .map_err(NotifyServerError::from) .and_then(|_| { // Subscribing only guarantees 5m TTL, so we always need to extend it. extend_subscription_ttl(client, topic.clone(), metrics) .map_ok(|_| ()) + .map_err(Into::into) }) .await }) diff --git a/src/services/relay_renewal_job/register_webhook.rs b/src/services/relay_renewal_job/register_webhook.rs index b497112c..3cd70d57 100644 --- a/src/services/relay_renewal_job/register_webhook.rs +++ b/src/services/relay_renewal_job/register_webhook.rs @@ -6,7 +6,7 @@ use { }, relay_rpc::{ auth::ed25519_dalek::SigningKey, - rpc::{WatchStatus, WatchType}, + rpc::{WatchError, WatchStatus, WatchType}, }, std::time::Duration, tracing::instrument, @@ -14,7 +14,11 @@ use { }; #[instrument(skip_all)] -pub async fn run(notify_url: &Url, keypair: &SigningKey, client: &Client) -> Result<(), Error> { +pub async fn run( + notify_url: &Url, + keypair: &SigningKey, + client: &Client, +) -> Result<(), Error> { client .watch_register( WatchRegisterRequest { From 16e6ca7cca7f2b2acee34fa6f44228d29a4d3e9a Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Mon, 19 Feb 2024 12:41:22 -0500 Subject: [PATCH 2/2] chore: comment --- src/publish_relay_message.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/publish_relay_message.rs b/src/publish_relay_message.rs index 3c975d88..ec5dc999 100644 --- a/src/publish_relay_message.rs +++ b/src/publish_relay_message.rs @@ -51,6 +51,9 @@ pub async fn publish_relay_message( Ok(_) => Ok(()), Err(e) => match e { Error::Response(rpc::Error::Handler(PublishError::MailboxLimitExceeded)) => { + // Only happens if there is no subscriber for the topic in the first place. + // So client is not expecting a response, or in the case of notify messages + // should use getNotifications to get old notifications anyway. info!("Mailbox limit exceeded for topic {}", publish.topic); Ok(()) }