diff --git a/Cargo.lock b/Cargo.lock index 7f2f8be9..6a06de8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3028,6 +3028,7 @@ dependencies = [ "thegraph", "thiserror", "tokio", + "tokio-util", "tower", "tower_governor", "tracing", @@ -6472,9 +6473,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", diff --git a/common/Cargo.toml b/common/Cargo.toml index 79587efd..d67461ea 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -45,6 +45,7 @@ autometrics = { version = "0.6.0", features = ["prometheus-exporter"] } tracing = "0.1.40" tower = "0.4.13" tower_governor = "0.1.0" +tokio-util = "0.7.10" [dev-dependencies] env_logger = "0.9.0" diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index d134a1e5..02aab736 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -23,6 +23,7 @@ pub struct TapManager { domain_separator: Arc, sender_denylist: Arc>>, _sender_denylist_watcher_handle: Arc>, + sender_denylist_watcher_cancel_token: Arc, } impl TapManager { @@ -49,10 +50,13 @@ impl TapManager { .await .expect("should be able to fetch the sender_denylist from the DB on startup"); + let sender_denylist_watcher_cancel_token = + Arc::new(tokio_util::sync::CancellationToken::new()); let sender_denylist_watcher_handle = Arc::new(tokio::spawn(Self::sender_denylist_watcher( pgpool.clone(), pglistener, sender_denylist.clone(), + sender_denylist_watcher_cancel_token.clone(), ))); Self { @@ -62,6 +66,7 @@ impl TapManager { domain_separator: Arc::new(domain_separator), sender_denylist, _sender_denylist_watcher_handle: sender_denylist_watcher_handle, + sender_denylist_watcher_cancel_token, } } @@ -169,6 +174,7 @@ impl TapManager { pgpool: PgPool, mut pglistener: PgListener, denylist: Arc>>, + cancel_token: Arc, ) { #[derive(serde::Deserialize)] struct DenylistNotification { @@ -177,52 +183,63 @@ impl TapManager { } loop { - let pg_notification = pglistener.recv().await.expect( - "should be able to receive Postgres Notify events on the channel \ - 'scalar_tap_deny_notification'", - ); - - println!( - "Received a denylist table notification: {}", - pg_notification.payload() - ); - - let denylist_notification: DenylistNotification = - serde_json::from_str(pg_notification.payload()).expect( - "should be able to deserialize the Postgres Notify event payload as a \ - DenylistNotification", - ); - - match denylist_notification.tg_op.as_str() { - "INSERT" => { - denylist - .write() - .await - .insert(denylist_notification.sender_address); + tokio::select! { + _ = cancel_token.cancelled() => { + break; } - "DELETE" => { - denylist - .write() - .await - .remove(&denylist_notification.sender_address); - } - // UPDATE and TRUNCATE are not expected to happen. Reload the entire denylist. - _ => { - error!( - "Received an unexpected denylist table notification: {}. Reloading entire \ - denylist.", - denylist_notification.tg_op + + pg_notification = pglistener.recv() => { + let pg_notification = pg_notification.expect( + "should be able to receive Postgres Notify events on the channel \ + 'scalar_tap_deny_notification'", ); - Self::sender_denylist_reload(pgpool.clone(), denylist.clone()) - .await - .expect("should be able to reload the sender denylist") + let denylist_notification: DenylistNotification = + serde_json::from_str(pg_notification.payload()).expect( + "should be able to deserialize the Postgres Notify event payload as a \ + DenylistNotification", + ); + + match denylist_notification.tg_op.as_str() { + "INSERT" => { + denylist + .write() + .await + .insert(denylist_notification.sender_address); + } + "DELETE" => { + denylist + .write() + .await + .remove(&denylist_notification.sender_address); + } + // UPDATE and TRUNCATE are not expected to happen. Reload the entire denylist. + _ => { + error!( + "Received an unexpected denylist table notification: {}. Reloading entire \ + denylist.", + denylist_notification.tg_op + ); + + Self::sender_denylist_reload(pgpool.clone(), denylist.clone()) + .await + .expect("should be able to reload the sender denylist") + } + } } } } } } +impl Drop for TapManager { + fn drop(&mut self) { + // Clean shutdown for the sender_denylist_watcher + // Though since it's not a critical task, we don't wait for it to finish (join). + self.sender_denylist_watcher_cancel_token.cancel(); + } +} + #[cfg(test)] mod test { use std::str::FromStr;