diff --git a/crates/db/src/connection/conn_impl.rs b/crates/db/src/connection/conn_impl.rs index 8e51fe4b..d4bacce8 100644 --- a/crates/db/src/connection/conn_impl.rs +++ b/crates/db/src/connection/conn_impl.rs @@ -256,14 +256,15 @@ impl UserConnection { nodes, edges, collect_instructions, - partial_config) + partial_config, + signers) VALUES ( gen_random_uuid(), $1, $2, jsonb_build_object('M', $3::JSONB), $4, $5, jsonb_build_object('SOL', $6::JSONB), - $7, $8, $9, $10, $11, $12) + $7, $8, $9, $10, $11, $12, $13) RETURNING id", ) .await @@ -294,6 +295,7 @@ impl UserConnection { &config.edges.iter().map(Json).collect::>(), &config.collect_instructions, &config.partial_config.as_ref().map(Json), + &Json(&config.signers), ], ) .await diff --git a/crates/flow-server/src/db_worker/signer.rs b/crates/flow-server/src/db_worker/signer.rs index 316b3d84..20fd99b6 100644 --- a/crates/flow-server/src/db_worker/signer.rs +++ b/crates/flow-server/src/db_worker/signer.rs @@ -6,6 +6,7 @@ use flow_lib::{ }; use futures_util::FutureExt; use hashbrown::{hash_map::Entry, HashMap}; +use serde_json::Value as JsonValue; use solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer}; use std::future::ready; @@ -118,12 +119,27 @@ impl SignerWorker { Ok(Self { signers }) } - pub async fn fetch_and_start<'a, I>(db: DbPool, users: I) -> Result, DbError> + pub async fn fetch_and_start<'a, I>( + db: DbPool, + users: I, + ) -> Result<(actix::Addr, JsonValue), DbError> where I: IntoIterator)>, { let signer = Self::fetch(db, users).await?; - tracing::debug!("using signers: {:#?}", signer.signers); - Ok(signer.start()) + let signers_info = signer + .signers + .iter() + .map(|(pk, w)| { + ( + pk.to_string(), + match w { + SignerType::Keypair(_) => "HARDCODED".to_owned(), + SignerType::UserWallet { user_id, .. } => user_id.to_string(), + }, + ) + }) + .collect::(); + Ok((signer.start(), signers_info.into())) } } diff --git a/crates/flow-server/src/db_worker/user_worker.rs b/crates/flow-server/src/db_worker/user_worker.rs index 980e6559..b5167b02 100644 --- a/crates/flow-server/src/db_worker/user_worker.rs +++ b/crates/flow-server/src/db_worker/user_worker.rs @@ -486,14 +486,14 @@ impl actix::Handler for UserWorker { }) .await??; - let signer = + let (signer, signers_info) = SignerWorker::fetch_and_start(db, &[(user_id, addr.clone().recipient())]).await?; let r = FlowRegistry::from_actix( msg.user, Vec::new(), msg.flow_id, - signer.recipient(), + (signer.recipient(), signers_info), addr.clone().recipient(), addr.clone().recipient(), addr.clone().recipient(), @@ -562,7 +562,7 @@ impl actix::Handler for UserWorker { }) .await??; - let signer = SignerWorker::fetch_and_start( + let (signer, signers_info) = SignerWorker::fetch_and_start( db, &[ (msg.started_by.0, msg.started_by.1.recipient()), @@ -575,7 +575,7 @@ impl actix::Handler for UserWorker { User { id: user_id }, [msg.started_by.0].into(), msg.flow_id, - signer.recipient(), + (signer.recipient(), signers_info), addr.clone().recipient(), addr.clone().recipient(), addr.clone().recipient(), diff --git a/crates/flow/src/flow_registry.rs b/crates/flow/src/flow_registry.rs index f09fdddc..01549383 100644 --- a/crates/flow/src/flow_registry.rs +++ b/crates/flow/src/flow_registry.rs @@ -13,6 +13,7 @@ use flow_lib::{ CommandType, FlowConfig, FlowId, FlowRunId, NodeId, UserId, ValueSet, }; use hashbrown::HashMap; +use serde_json::Value as JsonValue; use std::sync::Arc; use thiserror::Error as ThisError; use tracing::instrument::WithSubscriber; @@ -35,6 +36,7 @@ pub struct FlowRegistry { user: User, shared_with: Vec, flows: Arc>, + signers_info: JsonValue, endpoints: Endpoints, signer: signer::Svc, token: get_jwt::Svc, @@ -54,6 +56,7 @@ impl Default for FlowRegistry { shared_with: <_>::default(), flows: Arc::new(HashMap::new()), endpoints: <_>::default(), + signers_info: <_>::default(), signer, token, new_flow_run, @@ -121,7 +124,7 @@ impl FlowRegistry { user: User, shared_with: Vec, entrypoint: FlowId, - signer: signer::Svc, + (signer, signers_info): (signer::Svc, JsonValue), new_flow_run: new_flow_run::Svc, get_flow: get_flow::Svc, get_previous_values: get_previous_values::Svc, @@ -136,6 +139,7 @@ impl FlowRegistry { shared_with, flows: Arc::new(flows), signer, + signers_info, new_flow_run, get_previous_values, token, @@ -147,7 +151,7 @@ impl FlowRegistry { user: User, shared_with: Vec, entrypoint: FlowId, - signer: actix::Recipient, + (signer, signers_info): (actix::Recipient, JsonValue), new_flow_run: actix::Recipient, get_flow: actix::Recipient, get_previous_values: actix::Recipient, @@ -159,7 +163,10 @@ impl FlowRegistry { user, shared_with, entrypoint, - TowerClient::from_service(ActixService::from(signer), signer::Error::Worker, 16), + ( + TowerClient::from_service(ActixService::from(signer), signer::Error::Worker, 16), + signers_info, + ), TowerClient::from_service( ActixService::from(new_flow_run), new_flow_run::Error::Worker, @@ -224,6 +231,7 @@ impl FlowRegistry { } else { config.instructions_bundling.clone() }, + signers: self.signers_info.clone(), ..config.clone() }, inputs: inputs.clone(), diff --git a/lib/flow-lib/Cargo.toml b/lib/flow-lib/Cargo.toml index feacdbe9..163f9245 100644 --- a/lib/flow-lib/Cargo.toml +++ b/lib/flow-lib/Cargo.toml @@ -15,7 +15,7 @@ uuid = { version = "1.0.0", features = ["v4", "serde"] } serde = { version = "1.0.137", features = ["derive"] } serde_json = "1" serde_with = "2.2" -tower = { version = "0.4", features = ["buffer", "util"] } +tower = { version = "0.4", features = ["buffer", "util", "retry"] } tracing = "0.1" pin-project-lite = "0.2" actix = "0.13" diff --git a/lib/flow-lib/src/config/client.rs b/lib/flow-lib/src/config/client.rs index 71f61805..7e30697a 100644 --- a/lib/flow-lib/src/config/client.rs +++ b/lib/flow-lib/src/config/client.rs @@ -29,6 +29,8 @@ pub struct ClientConfig { pub call_depth: u32, #[serde(default = "default_origin")] pub origin: FlowRunOrigin, + #[serde(default)] + pub signers: JsonValue, } const fn default_origin() -> FlowRunOrigin {