Skip to content

Commit

Permalink
Check for missing commands
Browse files Browse the repository at this point in the history
  • Loading branch information
juchiast committed Nov 8, 2023
1 parent 101440e commit eafb1b0
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
14 changes: 14 additions & 0 deletions crates/db/src/connection/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ impl AdminConn {
Self { conn }
}

pub async fn get_natives_commands(self) -> crate::Result<Vec<String>> {
self.conn
.query(
r#"SELECT data->>'node_id' FROM nodes WHERE type = 'native' AND "isPublic""#,
&[],
)
.await
.map_err(Error::exec("get_natives_commands"))?
.into_iter()
.map(|r| r.try_get::<_, String>(0))
.collect::<Result<Vec<_>, _>>()
.map_err(Error::data("nodes.data->>'node_id'"))
}

pub async fn copy_in_flow_run_logs<I>(&self, rows: I) -> crate::Result<u64>
where
I: IntoIterator,
Expand Down
53 changes: 45 additions & 8 deletions crates/flow-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use flow_server::{
user::{SignatureAuth, SupabaseAuth},
wss, Config,
};
use futures_util::future::ok;
use std::convert::Infallible;
use futures_util::{future::ok, TryFutureExt};
use hashbrown::HashSet;
use std::{borrow::Cow, convert::Infallible};
use utils::address_book::AddressBook;

// avoid commands being optimized out by the compiler
Expand Down Expand Up @@ -83,6 +84,32 @@ async fn main() {
}
};

if let DbPool::Real(db) = &db {
let res = db
.get_admin_conn()
.and_then(move |conn| async move {
let names = conn.get_natives_commands().await?;
let mut missing = HashSet::new();
for name in names {
if !natives.contains(&&Cow::Borrowed(name.as_str())) {
missing.insert(name);
}
}
Ok(missing)
})
.await;
match res {
Ok(missing) => {
if !missing.is_empty() {
tracing::warn!("missing native commands: {:?}", missing);
}
}
Err(error) => {
tracing::error!("{}", error);
}
}
}

let db_worker = db_worker::DBWorker::new(db.clone(), config.clone(), actors).start();

let sig_auth = SignatureAuth::new(rand::random());
Expand Down Expand Up @@ -133,11 +160,18 @@ async fn main() {
.service(api::kvstore::write_item::service(&config, db.clone()))
.service(api::kvstore::delete_item::service(&config, db.clone()))
.service(api::kvstore::read_item::service(&config, db.clone()));
let db_route = web::scope("/proxy")
.service(api::db_rpc::service(&config, db.clone()))
.service(api::db_push_logs::service(&config, db.clone()))
.service(api::auth_proxy::service(&config, db.clone()))
.service(api::ws_auth_proxy::service(&config, db.clone()));

let db_proxy = if matches!(db, DbPool::Real(_)) {
Some(
web::scope("/proxy")
.service(api::db_rpc::service(&config, db.clone()))
.service(api::db_push_logs::service(&config, db.clone()))
.service(api::auth_proxy::service(&config, db.clone()))
.service(api::ws_auth_proxy::service(&config, db.clone())),
)
} else {
None
};

let app = App::new()
.wrap(Logger::new(r#""%r" %s %b %Dms"#).exclude("/healthcheck"))
Expand All @@ -153,12 +187,15 @@ async fn main() {
app = app.service(auth);
}

if let Some(db_proxy) = db_proxy {
app = app.service(db_proxy);
}

app.service(flow)
.service(signature)
.service(apikeys)
.service(websocket)
.service(kvstore)
.service(db_route)
.service(healthcheck)
})
.bind((host, port))
Expand Down

0 comments on commit eafb1b0

Please sign in to comment.