From eafb1b01d7dbecddd8bbb1e058ff24c404f419bc Mon Sep 17 00:00:00 2001 From: Duy Do Date: Wed, 8 Nov 2023 23:07:02 +0700 Subject: [PATCH] Check for missing commands --- crates/db/src/connection/admin.rs | 14 ++++++++ crates/flow-server/src/main.rs | 53 ++++++++++++++++++++++++++----- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/crates/db/src/connection/admin.rs b/crates/db/src/connection/admin.rs index eb739e7f..4f98e371 100644 --- a/crates/db/src/connection/admin.rs +++ b/crates/db/src/connection/admin.rs @@ -28,6 +28,20 @@ impl AdminConn { Self { conn } } + pub async fn get_natives_commands(self) -> crate::Result> { + self.conn + .query( + r#"SELECT data->>'node_id' FROM nodes WHERE type = 'native' AND "isPublic""#, + &[], + ) + .await + .map_err(Error::exec("get_natives_commands"))? + .into_iter() + .map(|r| r.try_get::<_, String>(0)) + .collect::, _>>() + .map_err(Error::data("nodes.data->>'node_id'")) + } + pub async fn copy_in_flow_run_logs(&self, rows: I) -> crate::Result where I: IntoIterator, diff --git a/crates/flow-server/src/main.rs b/crates/flow-server/src/main.rs index b4c4ea39..7f218d02 100644 --- a/crates/flow-server/src/main.rs +++ b/crates/flow-server/src/main.rs @@ -11,8 +11,9 @@ use flow_server::{ user::{SignatureAuth, SupabaseAuth}, wss, Config, }; -use futures_util::future::ok; -use std::convert::Infallible; +use futures_util::{future::ok, TryFutureExt}; +use hashbrown::HashSet; +use std::{borrow::Cow, convert::Infallible}; use utils::address_book::AddressBook; // avoid commands being optimized out by the compiler @@ -83,6 +84,32 @@ async fn main() { } }; + if let DbPool::Real(db) = &db { + let res = db + .get_admin_conn() + .and_then(move |conn| async move { + let names = conn.get_natives_commands().await?; + let mut missing = HashSet::new(); + for name in names { + if !natives.contains(&&Cow::Borrowed(name.as_str())) { + missing.insert(name); + } + } + Ok(missing) + }) + .await; + match res { + Ok(missing) => { + if !missing.is_empty() { + tracing::warn!("missing native commands: {:?}", missing); + } + } + Err(error) => { + tracing::error!("{}", error); + } + } + } + let db_worker = db_worker::DBWorker::new(db.clone(), config.clone(), actors).start(); let sig_auth = SignatureAuth::new(rand::random()); @@ -133,11 +160,18 @@ async fn main() { .service(api::kvstore::write_item::service(&config, db.clone())) .service(api::kvstore::delete_item::service(&config, db.clone())) .service(api::kvstore::read_item::service(&config, db.clone())); - let db_route = web::scope("/proxy") - .service(api::db_rpc::service(&config, db.clone())) - .service(api::db_push_logs::service(&config, db.clone())) - .service(api::auth_proxy::service(&config, db.clone())) - .service(api::ws_auth_proxy::service(&config, db.clone())); + + let db_proxy = if matches!(db, DbPool::Real(_)) { + Some( + web::scope("/proxy") + .service(api::db_rpc::service(&config, db.clone())) + .service(api::db_push_logs::service(&config, db.clone())) + .service(api::auth_proxy::service(&config, db.clone())) + .service(api::ws_auth_proxy::service(&config, db.clone())), + ) + } else { + None + }; let app = App::new() .wrap(Logger::new(r#""%r" %s %b %Dms"#).exclude("/healthcheck")) @@ -153,12 +187,15 @@ async fn main() { app = app.service(auth); } + if let Some(db_proxy) = db_proxy { + app = app.service(db_proxy); + } + app.service(flow) .service(signature) .service(apikeys) .service(websocket) .service(kvstore) - .service(db_route) .service(healthcheck) }) .bind((host, port))