Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: switch to relay webhooks #339

Merged
merged 30 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
91d8ea5
Register webhook and renew all subscriptions on start
chris13524 Jan 26, 2024
6c1a172
fix: use relay webhooks
chris13524 Jan 26, 2024
bc8c5ab
chore: remove tungstenite
chris13524 Jan 26, 2024
fc9bfcc
fix: remove instances of websocket
chris13524 Jan 26, 2024
323a5fc
chore: comment
chris13524 Jan 26, 2024
764f7f4
fix: increase startup timeout
chris13524 Jan 27, 2024
af4e557
chore: relay HTTP client -> relay client
chris13524 Jan 27, 2024
3ac8f7a
fix: further increase socket connect timeout
chris13524 Jan 27, 2024
055c906
fix: increase Postgres connection limit
chris13524 Jan 27, 2024
b95a17c
fix: max 1 connection
chris13524 Jan 27, 2024
bb9af9d
fix: bump max connections
chris13524 Jan 27, 2024
a92ecb5
fix: verify aud
chris13524 Jan 29, 2024
c0029da
fix: verify iss
chris13524 Jan 29, 2024
cb431b0
fix: restore topic unsubscribe
chris13524 Jan 29, 2024
4773abd
chore: test invalid JWT
chris13524 Jan 29, 2024
5aaf9cd
chore: tf docs
chris13524 Jan 29, 2024
bda0e03
chore: test webhook verification
chris13524 Jan 29, 2024
879b44b
chore: test webhook registration
chris13524 Jan 30, 2024
7f849ed
chore: update comments
chris13524 Jan 30, 2024
980eaee
chore: wip batchReceive
chris13524 Jan 30, 2024
a753a0f
chore: fmt
chris13524 Jan 30, 2024
68c9b36
fix: rewrite deployment test
chris13524 Jan 30, 2024
1953773
fix: remove message from mailbox
chris13524 Feb 1, 2024
90ea1db
Merge branch 'main' of https://github.com/WalletConnect/notify-server…
chris13524 Feb 8, 2024
d4d93c8
fix: avoid duplicate webhook registrations
chris13524 Feb 8, 2024
109713c
fix: update to proper spec
chris13524 Feb 8, 2024
630f4b9
feat: batch_receive batcher service
chris13524 Feb 9, 2024
08c24ac
fix: tweaks, fix test
chris13524 Feb 10, 2024
375efd9
chore: better comment
chris13524 Feb 10, 2024
8159fa3
chore: use tagged versions
chris13524 Feb 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/sub-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: "Set some postgres settings"
run: |
docker exec ${{ job.services.postgres.id }} sh -c 'cat /var/lib/postgresql/data/postgresql.conf'
docker exec ${{ job.services.postgres.id }} sh -c 'echo "max_connections=500" >> /var/lib/postgresql/data/postgresql.conf'
docker exec ${{ job.services.postgres.id }} sh -c 'echo "max_connections=1000" >> /var/lib/postgresql/data/postgresql.conf'
docker kill --signal=SIGHUP ${{ job.services.postgres.id }}

- name: Checkout
Expand Down
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ thiserror = "1.0"
async-trait = "0.1"
tokio-stream = "0.1.11"
regex = "1.7.1"
tungstenite = { version = "0.20", features = ["native-tls"] }
url = "2.3.1"
sha256 = "1.1.1"
chacha20poly1305 = "0.10.1"
Expand All @@ -75,8 +74,8 @@ futures = "0.3.26"
futures-util = "0.3"
dashmap = "5.4.0"

relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.26.0", features = ["cacao"] }
relay_client = { git = "https://github.com/WalletConnect/WalletConnectRust.git", tag = "v0.26.0" }
relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", branch = "fix/events-is-array-match-specs", features = ["cacao"] }
relay_client = { git = "https://github.com/WalletConnect/WalletConnectRust.git", branch = "fix/events-is-array-match-specs" }
x25519-dalek = { version = "2.0.0", features = ["static_secrets"] }
hkdf = "0.12.3"
sha2 = "0.10.6"
Expand Down
3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ devloop: unit fmt-imports
popd

just run-storage-docker test-integration

just run &
while ! nc -z 127.0.0.1 3000; do sleep 1; done

just test-deployment

echo "✅ Success! ✅"

# Run project linter
Expand Down
2 changes: 1 addition & 1 deletion rs-relay
4 changes: 3 additions & 1 deletion src/config/deployed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ pub struct DeployedConfiguration {
pub postgres_max_connections: u32,
pub keypair_seed: String,
pub project_id: ProjectId,
/// Websocket URL e.g. wss://relay.walletconnect.com
/// Relay URL e.g. https://relay.walletconnect.com
pub relay_url: Url,
pub relay_public_key: String,
pub notify_url: Url,

pub registry_url: Url,
Expand Down Expand Up @@ -88,6 +89,7 @@ pub fn get_configuration() -> Result<Configuration, NotifyServerError> {
keypair_seed: config.keypair_seed,
project_id: config.project_id,
relay_url: config.relay_url,
relay_public_key: config.relay_public_key,
notify_url: config.notify_url,
registry_url: config.registry_url,
registry_auth_token: config.registry_auth_token,
Expand Down
19 changes: 16 additions & 3 deletions src/config/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,33 @@ pub fn default_postgres_max_connections() -> u32 {
}

fn default_keypair_seed() -> String {
hex::encode(rand::Rng::gen::<[u8; 10]>(&mut rand::thread_rng()))
// Use a fixed seed as opposed to a random one for each startup because the server runs on a fixed host and re-uses the database.
// Using a random one will result in:
// - Duplicate relay topic subscriptions for the same topics
// - Duplicate webhook registrations for the same webhook URL
"".to_owned()
// hex::encode(rand::Rng::gen::<[u8; 10]>(&mut rand::thread_rng()))
}

fn default_relay_url() -> Url {
"ws://127.0.0.1:8888".parse().unwrap()
"http://127.0.0.1:8888".parse().unwrap()
}

fn default_registry_url() -> Url {
"https://registry.walletconnect.com".parse().unwrap()
}

pub fn get_configuration() -> Result<Configuration, NotifyServerError> {
pub async fn get_configuration() -> Result<Configuration, NotifyServerError> {
load_dot_env()?;
let config = envy::from_env::<LocalConfiguration>()?;

let relay_public_key = reqwest::get(config.relay_url.join("/public-key").unwrap())
.await
.unwrap()
.text()
.await
.unwrap();

let socket_addr = SocketAddr::from((config.bind_ip, config.port));
let notify_url = format!("http://{socket_addr}").parse::<Url>().unwrap();
let config = Configuration {
Expand All @@ -88,6 +100,7 @@ pub fn get_configuration() -> Result<Configuration, NotifyServerError> {
keypair_seed: config.keypair_seed,
project_id: config.project_id,
relay_url: config.relay_url,
relay_public_key,
registry_url: config.registry_url,
registry_auth_token: config.registry_auth_token,
auth_redis_addr_read: None,
Expand Down
7 changes: 4 additions & 3 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ pub struct Configuration {
pub postgres_max_connections: u32,
pub keypair_seed: String,
pub project_id: ProjectId,
/// Websocket URL e.g. wss://relay.walletconnect.com
/// Relay URL e.g. https://relay.walletconnect.com
pub relay_url: Url,
pub relay_public_key: String,
pub notify_url: Url,

pub registry_url: Url,
Expand Down Expand Up @@ -59,10 +60,10 @@ impl Configuration {
}
}

pub fn get_configuration() -> Result<Configuration, NotifyServerError> {
pub async fn get_configuration() -> Result<Configuration, NotifyServerError> {
if env::var("ENVIRONMENT") == Ok("DEPLOYED".to_owned()) {
deployed::get_configuration()
} else {
local::get_configuration()
local::get_configuration().await
}
}
13 changes: 5 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use {
auth,
model::types::AccountId,
rate_limit::{InternalRateLimitError, RateLimitExceeded},
services::websocket_server::handlers::notify_watch_subscriptions::CheckAppAuthorizationError,
},
axum::{response::IntoResponse, Json},
chacha20poly1305::aead,
Expand All @@ -16,10 +15,14 @@ use {
},
serde_json::json,
std::{array::TryFromSliceError, string::FromUtf8Error, sync::Arc},
thiserror::Error,
tracing::{error, info, warn},
};

#[derive(Debug, thiserror::Error)]
// Import not part of group above because it breaks formatting: https://github.com/rust-lang/rustfmt/issues/4746
use crate::services::public_http_server::handlers::relay_webhook::handlers::notify_watch_subscriptions::CheckAppAuthorizationError;

#[derive(Debug, Error)]
pub enum NotifyServerError {
#[error("Failed to load .env {0}")]
DotEnvy(#[from] dotenvy::Error),
Expand Down Expand Up @@ -63,9 +66,6 @@ pub enum NotifyServerError {
#[error(transparent)]
SerdeJson(#[from] serde_json::error::Error),

#[error(transparent)]
WebSocket(#[from] tungstenite::Error),

#[error(transparent)]
Broadcast(#[from] tokio::sync::broadcast::error::TryRecvError),

Expand Down Expand Up @@ -125,9 +125,6 @@ pub enum NotifyServerError {
#[error("Cryptography failure: {0}")]
EncryptionError(aead::Error),

#[error("Failed to receive on websocket")]
RecvError,

#[error(transparent)]
SystemTimeError(#[from] std::time::SystemTimeError),

Expand Down
44 changes: 24 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use {
metrics::Metrics,
registry::storage::redis::Redis,
relay_client_helpers::create_http_client,
rpc::decode_key,
services::{
private_http_server, public_http_server, publisher_service, watcher_expiration_job,
websocket_server::{self, decode_key},
private_http_server, public_http_server, publisher_service,
relay_mailbox_clearing_service, relay_renewal_job, watcher_expiration_job,
},
state::AppState,
},
Expand Down Expand Up @@ -40,6 +41,7 @@ pub mod publish_relay_message;
pub mod rate_limit;
pub mod registry;
pub mod relay_client_helpers;
pub mod rpc;
pub mod services;
pub mod spec;
pub mod state;
Expand Down Expand Up @@ -70,18 +72,7 @@ pub async fn bootstrap(
.map_err(|_| NotifyServerError::InvalidKeypairSeed)?; // TODO don't ignore error
let keypair = Keypair::generate(&mut StdRng::from_seed(keypair_seed));

let (relay_ws_client, rx) = {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let connection_handler =
services::websocket_server::relay_ws_client::RelayConnectionHandler::new(
"notify-client",
tx,
);
let relay_ws_client = Arc::new(relay_client::websocket::Client::new(connection_handler));
(relay_ws_client, rx)
};

let relay_http_client = Arc::new(create_http_client(
let relay_client = Arc::new(create_http_client(
&keypair,
config.relay_url.clone(),
config.notify_url.clone(),
Expand All @@ -106,21 +97,32 @@ pub async fn bootstrap(
metrics.clone(),
)?);

let (relay_mailbox_clearer_tx, relay_mailbox_clearer_rx) = tokio::sync::mpsc::channel(1000);

let state = Arc::new(AppState::new(
analytics.clone(),
config.clone(),
postgres.clone(),
keypair,
Keypair::from(keypair.secret_key()),
keypair_seed,
relay_ws_client.clone(),
relay_http_client.clone(),
relay_client.clone(),
metrics.clone(),
redis,
registry,
relay_mailbox_clearer_tx,
config.clock,
BlockchainApiProvider::new(config.project_id),
)?);

let relay_renewal_job = relay_renewal_job::start(
state.notify_keys.key_agreement_topic.clone(),
state.config.notify_url.clone(),
keypair,
relay_client.clone(),
postgres.clone(),
metrics.clone(),
)
.await?;
let private_http_server =
private_http_server::start(config.bind_ip, config.telemetry_prometheus_port);
let public_http_server = public_http_server::start(
Expand All @@ -130,22 +132,24 @@ pub async fn bootstrap(
state.clone(),
geoip_resolver,
);
let websocket_server = websocket_server::start(state, relay_ws_client, rx);
let publisher_service = publisher_service::start(
postgres.clone(),
relay_http_client.clone(),
relay_client.clone(),
metrics.clone(),
analytics,
);
let watcher_expiration_job = watcher_expiration_job::start(postgres, metrics);
let batch_receive_service =
relay_mailbox_clearing_service::start(relay_client.clone(), relay_mailbox_clearer_rx);

select! {
_ = shutdown.recv() => info!("Shutdown signal received, killing services"),
e = private_http_server => error!("Private HTTP server terminating with error {e:?}"),
e = public_http_server => error!("Public HTTP server terminating with error {e:?}"),
e = websocket_server => error!("Relay websocket server terminating with error {e:?}"),
e = relay_renewal_job => error!("Relay renewal job terminating with error {e:?}"),
e = publisher_service => error!("Publisher service terminating with error {e:?}"),
e = watcher_expiration_job => error!("Watcher expiration job terminating with error {e:?}"),
e = batch_receive_service => error!("Batch receive service terminating with error {e:?}"),
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {

#[tokio::main]
async fn main() -> Result<(), NotifyServerError> {
let config = get_configuration()?;
let config = get_configuration().await?;

tracing_subscriber::fmt()
.with_env_filter(&config.log_level)
Expand Down
4 changes: 1 addition & 3 deletions src/model/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use {
crate::{
error::NotifyServerError, services::websocket_server::decode_key, utils::get_client_id,
},
crate::{error::NotifyServerError, rpc::decode_key, utils::get_client_id},
chrono::{DateTime, Utc},
relay_rpc::domain::{DecodedClientId, ProjectId, Topic},
sqlx::FromRow,
Expand Down
16 changes: 8 additions & 8 deletions src/publish_relay_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn calculate_retry_in(tries: i32) -> Duration {

#[instrument(skip_all, fields(topic = %publish.topic, tag = %publish.tag, message_id = %get_message_id(&publish.message)))]
pub async fn publish_relay_message(
relay_http_client: &Client,
relay_client: &Client,
publish: &Publish,
metrics: Option<&Metrics>,
) -> Result<(), Error> {
Expand All @@ -35,7 +35,7 @@ pub async fn publish_relay_message(

let client_publish_call = || async {
let start = Instant::now();
let result = relay_http_client
let result = relay_client
.publish(
publish.topic.clone(),
publish.message.clone(),
Expand Down Expand Up @@ -83,9 +83,9 @@ pub async fn publish_relay_message(
Ok(())
}

#[instrument(skip(relay_ws_client, metrics))]
#[instrument(skip(relay_client, metrics))]
pub async fn subscribe_relay_topic(
relay_ws_client: &relay_client::websocket::Client,
relay_client: &Client,
topic: &Topic,
metrics: Option<&Metrics>,
) -> Result<(), Error> {
Expand All @@ -94,7 +94,7 @@ pub async fn subscribe_relay_topic(

let client_publish_call = || async {
let start = Instant::now();
let result = relay_ws_client.subscribe_blocking(topic.clone()).await;
let result = relay_client.subscribe_blocking(topic.clone()).await;
if let Some(metrics) = metrics {
metrics.relay_subscribe_request(start);
}
Expand Down Expand Up @@ -136,9 +136,9 @@ pub async fn subscribe_relay_topic(
Ok(())
}

#[instrument(skip(relay_http_client, metrics))]
#[instrument(skip(relay_client, metrics))]
pub async fn extend_subscription_ttl(
relay_http_client: &Client,
relay_client: &Client,
topic: Topic,
metrics: Option<&Metrics>,
) -> Result<(), Error> {
Expand All @@ -155,7 +155,7 @@ pub async fn extend_subscription_ttl(
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
};
publish_relay_message(relay_http_client, &publish, metrics).await
publish_relay_message(relay_client, &publish, metrics).await
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/rate_limit/token_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
error::NotifyServerError,
registry::storage::redis::Redis,
services::websocket_server::error::{
services::public_http_server::handlers::relay_webhook::error::{
RelayMessageClientError, RelayMessageError, RelayMessageServerError,
},
},
Expand Down
Loading
Loading