Skip to content

Commit

Permalink
flow_run.signers
Browse files Browse the repository at this point in the history
  • Loading branch information
juchiast committed Nov 17, 2023
1 parent 37f0c7d commit 3abe2f2
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 13 deletions.
6 changes: 4 additions & 2 deletions crates/db/src/connection/conn_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,15 @@ impl UserConnection {
nodes,
edges,
collect_instructions,
partial_config)
partial_config,
signers)
VALUES (
gen_random_uuid(),
$1, $2,
jsonb_build_object('M', $3::JSONB),
$4, $5,
jsonb_build_object('SOL', $6::JSONB),
$7, $8, $9, $10, $11, $12)
$7, $8, $9, $10, $11, $12, $13)
RETURNING id",
)
.await
Expand Down Expand Up @@ -294,6 +295,7 @@ impl UserConnection {
&config.edges.iter().map(Json).collect::<Vec<_>>(),
&config.collect_instructions,
&config.partial_config.as_ref().map(Json),
&Json(&config.signers),
],
)
.await
Expand Down
22 changes: 19 additions & 3 deletions crates/flow-server/src/db_worker/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use flow_lib::{
};
use futures_util::FutureExt;
use hashbrown::{hash_map::Entry, HashMap};
use serde_json::Value as JsonValue;
use solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer};
use std::future::ready;

Expand Down Expand Up @@ -118,12 +119,27 @@ impl SignerWorker {
Ok(Self { signers })
}

pub async fn fetch_and_start<'a, I>(db: DbPool, users: I) -> Result<actix::Addr<Self>, DbError>
pub async fn fetch_and_start<'a, I>(
db: DbPool,
users: I,
) -> Result<(actix::Addr<Self>, JsonValue), DbError>
where
I: IntoIterator<Item = &'a (UserId, actix::Recipient<SignatureRequest>)>,
{
let signer = Self::fetch(db, users).await?;
tracing::debug!("using signers: {:#?}", signer.signers);
Ok(signer.start())
let signers_info = signer
.signers
.iter()
.map(|(pk, w)| {
(
pk.to_string(),
match w {
SignerType::Keypair(_) => "HARDCODED".to_owned(),
SignerType::UserWallet { user_id, .. } => user_id.to_string(),
},
)
})
.collect::<JsonValue>();
Ok((signer.start(), signers_info.into()))

Check warning on line 143 in crates/flow-server/src/db_worker/signer.rs

View workflow job for this annotation

GitHub Actions / clippy

useless conversion to the same type: `serde_json::Value`

warning: useless conversion to the same type: `serde_json::Value` --> crates/flow-server/src/db_worker/signer.rs:143:29 | 143 | Ok((signer.start(), signers_info.into())) | ^^^^^^^^^^^^^^^^^^^ help: consider removing `.into()`: `signers_info` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `#[warn(clippy::useless_conversion)]` on by default

Check warning on line 143 in crates/flow-server/src/db_worker/signer.rs

View workflow job for this annotation

GitHub Actions / clippy

useless conversion to the same type: `serde_json::Value`

warning: useless conversion to the same type: `serde_json::Value` --> crates/flow-server/src/db_worker/signer.rs:143:29 | 143 | Ok((signer.start(), signers_info.into())) | ^^^^^^^^^^^^^^^^^^^ help: consider removing `.into()`: `signers_info` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `#[warn(clippy::useless_conversion)]` on by default
}
}
8 changes: 4 additions & 4 deletions crates/flow-server/src/db_worker/user_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,14 @@ impl actix::Handler<StartFlowFresh> for UserWorker {
})
.await??;

let signer =
let (signer, signers_info) =
SignerWorker::fetch_and_start(db, &[(user_id, addr.clone().recipient())]).await?;

let r = FlowRegistry::from_actix(
msg.user,
Vec::new(),
msg.flow_id,
signer.recipient(),
(signer.recipient(), signers_info),
addr.clone().recipient(),
addr.clone().recipient(),
addr.clone().recipient(),
Expand Down Expand Up @@ -562,7 +562,7 @@ impl actix::Handler<StartFlowShared> for UserWorker {
})
.await??;

let signer = SignerWorker::fetch_and_start(
let (signer, signers_info) = SignerWorker::fetch_and_start(
db,
&[
(msg.started_by.0, msg.started_by.1.recipient()),
Expand All @@ -575,7 +575,7 @@ impl actix::Handler<StartFlowShared> for UserWorker {
User { id: user_id },
[msg.started_by.0].into(),
msg.flow_id,
signer.recipient(),
(signer.recipient(), signers_info),
addr.clone().recipient(),
addr.clone().recipient(),
addr.clone().recipient(),
Expand Down
14 changes: 11 additions & 3 deletions crates/flow/src/flow_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use flow_lib::{
CommandType, FlowConfig, FlowId, FlowRunId, NodeId, UserId, ValueSet,
};
use hashbrown::HashMap;
use serde_json::Value as JsonValue;
use std::sync::Arc;
use thiserror::Error as ThisError;
use tracing::instrument::WithSubscriber;
Expand All @@ -35,6 +36,7 @@ pub struct FlowRegistry {
user: User,
shared_with: Vec<UserId>,
flows: Arc<HashMap<FlowId, ClientConfig>>,
signers_info: JsonValue,
endpoints: Endpoints,
signer: signer::Svc,
token: get_jwt::Svc,
Expand All @@ -54,6 +56,7 @@ impl Default for FlowRegistry {
shared_with: <_>::default(),
flows: Arc::new(HashMap::new()),
endpoints: <_>::default(),
signers_info: <_>::default(),
signer,
token,
new_flow_run,
Expand Down Expand Up @@ -121,7 +124,7 @@ impl FlowRegistry {
user: User,
shared_with: Vec<UserId>,
entrypoint: FlowId,
signer: signer::Svc,
(signer, signers_info): (signer::Svc, JsonValue),
new_flow_run: new_flow_run::Svc,
get_flow: get_flow::Svc,
get_previous_values: get_previous_values::Svc,
Expand All @@ -136,6 +139,7 @@ impl FlowRegistry {
shared_with,
flows: Arc::new(flows),
signer,
signers_info,
new_flow_run,
get_previous_values,
token,
Expand All @@ -147,7 +151,7 @@ impl FlowRegistry {
user: User,
shared_with: Vec<UserId>,
entrypoint: FlowId,
signer: actix::Recipient<signer::SignatureRequest>,
(signer, signers_info): (actix::Recipient<signer::SignatureRequest>, JsonValue),
new_flow_run: actix::Recipient<new_flow_run::Request>,
get_flow: actix::Recipient<get_flow::Request>,
get_previous_values: actix::Recipient<get_previous_values::Request>,
Expand All @@ -159,7 +163,10 @@ impl FlowRegistry {
user,
shared_with,
entrypoint,
TowerClient::from_service(ActixService::from(signer), signer::Error::Worker, 16),
(
TowerClient::from_service(ActixService::from(signer), signer::Error::Worker, 16),
signers_info,
),
TowerClient::from_service(
ActixService::from(new_flow_run),
new_flow_run::Error::Worker,
Expand Down Expand Up @@ -224,6 +231,7 @@ impl FlowRegistry {
} else {
config.instructions_bundling.clone()
},
signers: self.signers_info.clone(),
..config.clone()
},
inputs: inputs.clone(),
Expand Down
2 changes: 1 addition & 1 deletion lib/flow-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ uuid = { version = "1.0.0", features = ["v4", "serde"] }
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1"
serde_with = "2.2"
tower = { version = "0.4", features = ["buffer", "util"] }
tower = { version = "0.4", features = ["buffer", "util", "retry"] }
tracing = "0.1"
pin-project-lite = "0.2"
actix = "0.13"
Expand Down
2 changes: 2 additions & 0 deletions lib/flow-lib/src/config/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct ClientConfig {
pub call_depth: u32,
#[serde(default = "default_origin")]
pub origin: FlowRunOrigin,
#[serde(default)]
pub signers: JsonValue,
}

const fn default_origin() -> FlowRunOrigin {
Expand Down

0 comments on commit 3abe2f2

Please sign in to comment.