Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patches #10

Merged
merged 6 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions crates/db/src/connection/conn_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl UserConnection {
pub async fn get_wallets(&self) -> crate::Result<Vec<Wallet>> {
let stmt = self
.conn
.prepare_cached("SELECT public_key, keypair FROM wallets WHERE user_id = $1")
.prepare_cached("SELECT public_key, keypair, id FROM wallets WHERE user_id = $1")
.await
.map_err(Error::exec("prepare get_wallets"))?;
self.conn
Expand All @@ -88,7 +88,13 @@ impl UserConnection {
.transpose()
.map_err(Error::parsing("wallets.keypair"))?;

Ok(Wallet { pubkey, keypair })
let id = r.try_get(2).map_err(Error::data("wallets.id"))?;

Ok(Wallet {
id,
pubkey,
keypair,
})
})
.collect()
}
Expand Down Expand Up @@ -250,14 +256,15 @@ impl UserConnection {
nodes,
edges,
collect_instructions,
partial_config)
partial_config,
signers)
VALUES (
gen_random_uuid(),
$1, $2,
jsonb_build_object('M', $3::JSONB),
$4, $5,
jsonb_build_object('SOL', $6::JSONB),
$7, $8, $9, $10, $11, $12)
$7, $8, $9, $10, $11, $12, $13)
RETURNING id",
)
.await
Expand Down Expand Up @@ -288,6 +295,7 @@ impl UserConnection {
&config.edges.iter().map(Json).collect::<Vec<_>>(),
&config.collect_instructions,
&config.partial_config.as_ref().map(Json),
&Json(&config.signers),
],
)
.await
Expand Down
1 change: 1 addition & 0 deletions crates/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct FlowRunLogsRow {

#[derive(Serialize, Deserialize)]
pub struct Wallet {
pub id: i64,
#[serde(with = "utils::serde_bs58")]
pub pubkey: [u8; 32],
#[serde(
Expand Down
96 changes: 78 additions & 18 deletions crates/flow-server/src/db_worker/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,35 @@
UserId,
};
use futures_util::FutureExt;
use hashbrown::HashMap;
use hashbrown::{hash_map::Entry, HashMap};
use serde_json::Value as JsonValue;
use solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer};
use std::future::ready;

pub enum SignerType {
Keypair(Keypair),
Keypair(Box<Keypair>),
UserWallet {
// Forward to UserWorker
user_id: UserId,
sender: actix::Recipient<SignatureRequest>,
},
}

impl std::fmt::Debug for SignerType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Keypair(k) => f
.debug_tuple("SignerType::Keypair")
.field(&k.pubkey())
.finish(),
Self::UserWallet { user_id, .. } => f
.debug_tuple("SignerType::UserWallet")
.field(&user_id)
.finish(),
}
}
}

pub struct SignerWorker {
pub signers: HashMap<Pubkey, SignerType>,
}
Expand All @@ -35,7 +52,7 @@
signature: keypair.sign_message(&msg.message),
}))
.boxed(),
Some(SignerType::UserWallet { sender }) => {
Some(SignerType::UserWallet { sender, .. }) => {
let fut = sender.send(msg);
async move { fut.await? }.boxed()
}
Expand All @@ -44,42 +61,85 @@
}

impl SignerWorker {
pub async fn fetch_and_start<'a, I>(db: DbPool, users: I) -> Result<actix::Addr<Self>, DbError>
pub async fn fetch<'a, I>(db: DbPool, users: I) -> Result<Self, DbError>
where
I: IntoIterator<Item = &'a (UserId, actix::Recipient<SignatureRequest>)>,
{
let mut signers = HashMap::new();
for (user, sender) in users {
let conn = db.get_user_conn(*user).await?;

for (user_id, sender) in users {
let conn = db.get_user_conn(*user_id).await?;
let wallets = conn.get_wallets().await?;
for w in wallets {
let pk = Pubkey::new_from_array(w.pubkey);
if !pk.is_on_curve() {
tracing::warn!("invalid wallet: pubkey is not on curve");
tracing::warn!("invalid wallet: pubkey is not on curve; id={}", w.id);
continue;
}
let s = match w.keypair {
None => SignerType::UserWallet {
user_id: *user_id,
sender: sender.clone(),
},
Some(keypair) => {
let keypair = Keypair::from_bytes(&keypair).ok().and_then(|k| {
let pubkey: ed25519_dalek::PublicKey = k.secret().into();
(k.pubkey().to_bytes() == pubkey.to_bytes())
.then_some(SignerType::Keypair(k))
});
match keypair {
None => {
tracing::warn!("invalid keypair: mismatch");
let keypair = match Keypair::from_bytes(&keypair) {
Ok(keypair) => keypair,
Err(error) => {
tracing::warn!("invalid keypair: {}; id={}", error, w.id);
continue;
}
Some(signer) => signer,
};
// check to prevent https://github.com/advisories/GHSA-w5vr-6qhr-36cc
if keypair.pubkey().to_bytes()
!= ed25519_dalek::PublicKey::from(keypair.secret()).to_bytes()
{
tracing::warn!("invalid keypair: mismatch; id={}", w.id);
continue;
}
SignerType::Keypair(Box::new(keypair))
}
};
signers.insert(pk, s);
match signers.entry(pk) {
Entry::Vacant(slot) => {
slot.insert(s);
}
Entry::Occupied(mut slot) => {
if matches!(
(slot.get(), &s),
(SignerType::UserWallet { .. }, SignerType::Keypair(_))
) {
tracing::warn!("replacing wallet {}", pk);
slot.insert(s);
}
}
}
}
}
Ok(Self { signers }.start())

Ok(Self { signers })
}

pub async fn fetch_and_start<'a, I>(
db: DbPool,
users: I,
) -> Result<(actix::Addr<Self>, JsonValue), DbError>
where
I: IntoIterator<Item = &'a (UserId, actix::Recipient<SignatureRequest>)>,
{
let signer = Self::fetch(db, users).await?;
let signers_info = signer
.signers
.iter()
.map(|(pk, w)| {
(
pk.to_string(),
match w {
SignerType::Keypair(_) => "HARDCODED".to_owned(),
SignerType::UserWallet { user_id, .. } => user_id.to_string(),
},
)
})
.collect::<JsonValue>();
Ok((signer.start(), signers_info.into()))

Check warning on line 143 in crates/flow-server/src/db_worker/signer.rs

View workflow job for this annotation

GitHub Actions / clippy

useless conversion to the same type: `serde_json::Value`

warning: useless conversion to the same type: `serde_json::Value` --> crates/flow-server/src/db_worker/signer.rs:143:29 | 143 | Ok((signer.start(), signers_info.into())) | ^^^^^^^^^^^^^^^^^^^ help: consider removing `.into()`: `signers_info` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `#[warn(clippy::useless_conversion)]` on by default

Check warning on line 143 in crates/flow-server/src/db_worker/signer.rs

View workflow job for this annotation

GitHub Actions / clippy

useless conversion to the same type: `serde_json::Value`

warning: useless conversion to the same type: `serde_json::Value` --> crates/flow-server/src/db_worker/signer.rs:143:29 | 143 | Ok((signer.start(), signers_info.into())) | ^^^^^^^^^^^^^^^^^^^ help: consider removing `.into()`: `signers_info` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `#[warn(clippy::useless_conversion)]` on by default
}
}
38 changes: 35 additions & 3 deletions crates/flow-server/src/db_worker/token_worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
api::{apikey_info, claim_token},
auth::X_API_KEY,
user::{PasswordLogin, SupabaseError},
user::PasswordLogin,
};
use actix::{Actor, ActorFutureExt, Addr, AsyncContext, ResponseFuture, WrapFuture};
use actix_web::http::StatusCode;
Expand Down Expand Up @@ -178,13 +178,25 @@ impl Actor for TokenWorker {
}
}

#[derive(Deserialize, Debug)]
struct GoTrueError {
error: String,
error_description: String,
}

async fn supabase_error(resp: reqwest::Response) -> get_jwt::Error {
let bytes = match resp.bytes().await {
Ok(bytes) => bytes,
Err(error) => return get_jwt::Error::other(error),
};
match serde_json::from_slice::<SupabaseError>(&bytes) {
Ok(error) => get_jwt::Error::other(error.message),
match serde_json::from_slice::<GoTrueError>(&bytes) {
Ok(GoTrueError {
error,
error_description,
}) => get_jwt::Error::Supabase {
error,
error_description,
},
Err(_) => get_jwt::Error::other(String::from_utf8_lossy(&bytes)),
}
}
Expand Down Expand Up @@ -405,3 +417,23 @@ pub async fn token_from_apikeys(
.collect::<HashMap<UserId, get_jwt::Svc>>();
(services, actors)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn need_key_refresh() {
let error = refresh(
"Hello".to_owned(),
Endpoints {
flow_server: String::new(),
supabase: "https://base.spaceoperator.com".to_owned(),
supabase_anon_key: std::env::var("ANON_KEY").unwrap(),
},
)
.await
.unwrap_err();
dbg!(error);
}
}
21 changes: 15 additions & 6 deletions crates/flow-server/src/db_worker/user_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ impl ResponseError for StartError {
get_jwt::Error::WrongRecipient
| get_jwt::Error::Worker(_)
| get_jwt::Error::MailBox(_)
| get_jwt::Error::Supabase { .. }
| get_jwt::Error::Other(_) => StatusCode::INTERNAL_SERVER_ERROR,
},
StartError::Mailbox(_) => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -485,14 +486,14 @@ impl actix::Handler<StartFlowFresh> for UserWorker {
})
.await??;

let signer =
let (signer, signers_info) =
SignerWorker::fetch_and_start(db, &[(user_id, addr.clone().recipient())]).await?;

let r = FlowRegistry::from_actix(
msg.user,
Vec::new(),
msg.flow_id,
signer.recipient(),
(signer.recipient(), signers_info),
addr.clone().recipient(),
addr.clone().recipient(),
addr.clone().recipient(),
Expand Down Expand Up @@ -561,11 +562,11 @@ impl actix::Handler<StartFlowShared> for UserWorker {
})
.await??;

let signer = SignerWorker::fetch_and_start(
let (signer, signers_info) = SignerWorker::fetch_and_start(
db,
&[
(user_id, addr.clone().recipient()),
(msg.started_by.0, msg.started_by.1.recipient()),
(user_id, addr.clone().recipient()),
],
)
.await?;
Expand All @@ -574,7 +575,7 @@ impl actix::Handler<StartFlowShared> for UserWorker {
User { id: user_id },
[msg.started_by.0].into(),
msg.flow_id,
signer.recipient(),
(signer.recipient(), signers_info),
addr.clone().recipient(),
addr.clone().recipient(),
addr.clone().recipient(),
Expand All @@ -585,7 +586,15 @@ impl actix::Handler<StartFlowShared> for UserWorker {
.await?;

let run_id = r
.start(msg.flow_id, msg.input, None, false, FlowRunOrigin::Start {})
.start(
msg.flow_id,
msg.input,
None,
false,
FlowRunOrigin::StartShared {
started_by: msg.started_by.0,
},
)
.await?
.0;

Expand Down
Loading
Loading