diff --git a/Cargo.lock b/Cargo.lock index fce81a22..04eacec6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3117,7 +3117,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "relay_client" version = "0.1.0" -source = "git+https://github.com/WalletConnect/WalletConnectRust.git?tag=v0.24.0#d2a772f2a23e6ed8219d2f16af7f88878097388a" +source = "git+https://github.com/WalletConnect/WalletConnectRust.git?tag=v0.25.0#b4683d70a46b990433a8d38d8a047f330949f7bf" dependencies = [ "chrono", "futures-channel", @@ -3140,7 +3140,7 @@ dependencies = [ [[package]] name = "relay_rpc" version = "0.1.0" -source = "git+https://github.com/WalletConnect/WalletConnectRust.git?tag=v0.24.0#d2a772f2a23e6ed8219d2f16af7f88878097388a" +source = "git+https://github.com/WalletConnect/WalletConnectRust.git?tag=v0.25.0#b4683d70a46b990433a8d38d8a047f330949f7bf" dependencies = [ "bs58", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 2446852f..9ec28550 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,8 +74,8 @@ futures = "0.3.26" futures-util = "0.3" dashmap = "5.4.0" -relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.24.0", features = ["cacao"] } -relay_client = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.24.0" } +relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.25.0", features = ["cacao"] } +relay_client = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.25.0" } x25519-dalek = { version = "2.0.0", features = ["static_secrets"] } hkdf = "0.12.3" sha2 = "0.10.6" diff --git a/rs-relay b/rs-relay index f7a10ce8..12e48c0e 160000 --- a/rs-relay +++ b/rs-relay @@ -1 +1 @@ -Subproject commit f7a10ce801395bbccabd277779e4d4ef2a59a92c +Subproject commit 12e48c0e7292849b05d6d8c12acb35b2713cf00c diff --git a/src/services/public_http_server/handlers/relay_webhook/mod.rs b/src/services/public_http_server/handlers/relay_webhook/mod.rs index ad6f5224..b0ffcc05 100644 --- a/src/services/public_http_server/handlers/relay_webhook/mod.rs +++ b/src/services/public_http_server/handlers/relay_webhook/mod.rs @@ -106,22 +106,23 @@ pub async fn handler( // TODO check sub let event = claims.evt; - let incoming_message = RelayIncomingMessage { - topic: event.topic, - message: event.message, - tag: event.tag, - }; // TODO send to channel and process in batches state .relay_client .batch_receive(vec![Receipt { - topic: incoming_message.topic.clone(), - message_id: serde_json::from_str("0").unwrap(), // FIXME use actual message ID + topic: event.topic.clone(), + message_id: event.message_id, }]) .await .unwrap(); // TODO remove unwrap + let incoming_message = RelayIncomingMessage { + topic: event.topic, + message: event.message, + tag: event.tag, + }; + if claims.act != WatchAction::WatchEvent { return Err(Error::ClientError(ClientError::WrongWatchAction( claims.act, diff --git a/src/services/relay_renewal_job/register_webhook.rs b/src/services/relay_renewal_job/register_webhook.rs index d3797cb6..60b0d541 100644 --- a/src/services/relay_renewal_job/register_webhook.rs +++ b/src/services/relay_renewal_job/register_webhook.rs @@ -1,11 +1,5 @@ use { - crate::{ - services::public_http_server::RELAY_WEBHOOK_ENDPOINT, - spec::{ - NOTIFY_DELETE_TAG, NOTIFY_GET_NOTIFICATIONS_TAG, NOTIFY_SUBSCRIBE_TAG, - NOTIFY_UPDATE_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TAG, - }, - }, + crate::{services::public_http_server::RELAY_WEBHOOK_ENDPOINT, spec::INCOMING_TAGS}, relay_client::{ error::Error, http::{Client, WatchRegisterRequest}, @@ -30,13 +24,9 @@ pub async fn run(notify_url: &Url, keypair: &Keypair, client: &Client) -> Result .expect("Should be able to join static URLs") .to_string(), watch_type: WatchType::Subscriber, - tags: vec![ - NOTIFY_SUBSCRIBE_TAG, - NOTIFY_DELETE_TAG, - NOTIFY_UPDATE_TAG, - NOTIFY_WATCH_SUBSCRIPTIONS_TAG, - NOTIFY_GET_NOTIFICATIONS_TAG, - ], + tags: INCOMING_TAGS.to_vec(), + // Alternatively we could not care about the tag, as an incoming message is an incoming message + // tags: (4000..4100).collect(), statuses: vec![WatchStatus::Queued], ttl: Duration::from_secs(60 * 60 * 24 * 30), }, diff --git a/src/spec.rs b/src/spec.rs index 99c4a286..10056b55 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -28,6 +28,16 @@ pub const NOTIFY_NOOP_TAG: u32 = 4050; pub const NOTIFY_GET_NOTIFICATIONS_TAG: u32 = 4014; pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_TAG: u32 = 4015; +pub const INCOMING_TAGS: [u32; 7] = [ + NOTIFY_SUBSCRIBE_TAG, + NOTIFY_MESSAGE_RESPONSE_TAG, + NOTIFY_DELETE_TAG, + NOTIFY_UPDATE_TAG, + NOTIFY_WATCH_SUBSCRIPTIONS_TAG, + NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG, + NOTIFY_GET_NOTIFICATIONS_TAG, +]; + // TTLs // https://specs.walletconnect.com/2.0/specs/clients/notify/rpc-methods // https://specs.walletconnect.com/2.0/specs/clients/notify/notify-authentication diff --git a/tests/integration.rs b/tests/integration.rs index 5f8d0617..e08836c7 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -41,6 +41,7 @@ use { notify_message::NotifyMessage, rate_limit::{self, ClockImpl}, registry::{storage::redis::Redis, RegistryAuthResponse}, + relay_client_helpers::create_http_client, rpc::{ decode_key, AuthMessage, NotifyDelete, NotifyRequest, NotifyResponse, NotifyUpdate, ResponseAuth, @@ -917,6 +918,7 @@ struct NotifyServerContext { redis: Arc, #[allow(dead_code)] // must hold onto MockServer reference or it will shut down registry_mock_server: MockServer, + keypair_seed: String, clock: Arc, } @@ -955,6 +957,7 @@ impl AsyncTestContext for NotifyServerContext { .await .unwrap(); let (_, postgres_url) = get_postgres().await; + let keypair_seed = hex::encode(rand::Rng::gen::<[u8; 10]>(&mut rand::thread_rng())); let clock = Arc::new(MockClock::new(Utc::now())); // TODO reuse the local configuration defaults here let config = Configuration { @@ -965,7 +968,7 @@ impl AsyncTestContext for NotifyServerContext { bind_ip, port: bind_port, registry_url: registry_mock_server.uri().parse().unwrap(), - keypair_seed: hex::encode(rand::Rng::gen::<[u8; 10]>(&mut rand::thread_rng())), + keypair_seed: keypair_seed.clone(), project_id: vars.project_id.into(), relay_url, relay_public_key, @@ -1023,6 +1026,7 @@ impl AsyncTestContext for NotifyServerContext { postgres, redis, registry_mock_server, + keypair_seed, clock, } } @@ -8870,6 +8874,7 @@ async fn relay_webhook_rejects_wrong_aud(notify_server: &NotifyServerContext) { typ: WatchType::Subscriber, whu: webhook_url.to_string(), evt: WatchEventPayload { + message_id: serde_json::from_str("0").unwrap(), status: WatchStatus::Queued, topic: Topic::generate(), message: "message".to_owned().into(), @@ -8921,6 +8926,7 @@ async fn relay_webhook_rejects_invalid_signature(notify_server: &NotifyServerCon typ: WatchType::Subscriber, whu: webhook_url.to_string(), evt: WatchEventPayload { + message_id: serde_json::from_str("0").unwrap(), status: WatchStatus::Queued, topic: Topic::generate(), message: "message".to_owned().into(), @@ -8982,6 +8988,7 @@ async fn relay_webhook_rejects_wrong_iss(notify_server: &NotifyServerContext) { typ: WatchType::Subscriber, whu: webhook_url.to_string(), evt: WatchEventPayload { + message_id: serde_json::from_str("0").unwrap(), status: WatchStatus::Queued, topic: Topic::generate(), message: "message".to_owned().into(), @@ -9022,7 +9029,6 @@ async fn relay_webhook_rejects_wrong_iss(notify_server: &NotifyServerContext) { #[test_context(NotifyServerContext)] #[tokio::test] -#[ignore] async fn batch_receive_called(notify_server: &NotifyServerContext) { let (account_signing_key, account) = generate_account(); @@ -9058,7 +9064,7 @@ async fn batch_receive_called(notify_server: &NotifyServerContext) { let vars = get_vars(); let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), - vars.project_id.into(), + vars.project_id.clone().into(), notify_server.url.clone(), ) .await; @@ -9074,10 +9080,26 @@ async fn batch_receive_called(notify_server: &NotifyServerContext) { ) .await; - let response = relay_client - .client + // Cannot poll because .fetch() also removes the messages + + let notify_server_relay_client = { + let keypair_seed = + decode_key(&sha256::digest(notify_server.keypair_seed.as_bytes())).unwrap(); + let keypair = Keypair::generate(&mut StdRng::from_seed(keypair_seed)); + + create_http_client( + &keypair, + vars.relay_url.parse().unwrap(), + notify_server.url.clone(), + vars.project_id.into(), + ) + .unwrap() + }; + + let response = notify_server_relay_client .fetch(topic_from_key(key_agreement.as_bytes())) .await .unwrap(); + println!("fetch response: {response:?}"); assert_eq!(response.messages.len(), 0); }