Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
juchiast committed Dec 17, 2024
1 parent 36959c4 commit b4c529a
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/cmds-std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ serde_json = { workspace = true, features = ["raw_value"] }
anyhow = { workspace = true }
bs58 = { workspace = true }
thiserror = "1"
reqwest = "0.12"
reqwest = { version = "0.12", features = ["multipart"] }
futures-util = "0.3.29"
rust_decimal = { version = "1.32.0", features = ["serde-with-float"] }
tracing = "0.1.40"
Expand Down
16 changes: 16 additions & 0 deletions crates/flow-server/src/db_worker/user_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,22 @@ impl ResponseError for StartError {
| get_flow::Error::MailBox(_)
| get_flow::Error::Other(_) => StatusCode::INTERNAL_SERVER_ERROR,
},
flow::Error::GetFlowRow(e) => match e {
flow::flow_set::get_flow_row::Error::NotFound => StatusCode::NOT_FOUND,
flow::flow_set::get_flow_row::Error::Unauthorized => StatusCode::UNAUTHORIZED,
flow::flow_set::get_flow_row::Error::Worker(_)
| flow::flow_set::get_flow_row::Error::MailBox(_)
| flow::flow_set::get_flow_row::Error::Other(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
},
flow::Error::MakeSigner(e) => match e {
flow::flow_set::make_signer::Error::Worker(_)
| flow::flow_set::make_signer::Error::MailBox(_)
| flow::flow_set::make_signer::Error::Other(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
},
flow::Error::Cycle => StatusCode::BAD_REQUEST,
flow::Error::NeedOneTx => StatusCode::BAD_REQUEST,
flow::Error::NeedOneSignatureOutput => StatusCode::BAD_REQUEST,
Expand Down
22 changes: 13 additions & 9 deletions crates/flow/src/flow_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use getset::Getters;
use hashbrown::HashMap;
use serde_json::Value as JsonValue;
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
use tokio::{sync::Semaphore, task::JoinHandle};

use crate::{
command::{interflow, interflow_instructions},
flow_graph::FlowRunResult,
flow_registry::{
get_flow, get_previous_values, new_flow_run, run_rhai, FlowRegistry, StartFlowOptions,
},
Expand Down Expand Up @@ -87,7 +88,7 @@ impl Flow {
#[derive(bon::Builder)]
pub struct FlowDeployment {
/// Owner of this deployment (and all flows belonging to it)
pub owner: UserId,
pub user_id: UserId,
/// Flow ID to start the set
pub entrypoint: FlowId,
/// Flow configs
Expand All @@ -106,7 +107,7 @@ impl FlowDeployment {
.entrypoint(flow.row.id)
.start_permission(flow.start_permission())
.wallets_id(flow.wallets_id())
.owner(flow.row.user_id)
.user_id(flow.row.user_id)
.flows([(flow.row.id, flow)].into())
.build()
}
Expand Down Expand Up @@ -247,7 +248,7 @@ impl FlowSet {
.call_mut(get_flow_row::Request { flow_id: id })
.await?
.row;
let flow = Flow::builder().row(row).output_instructions(false).build();
let flow = Flow::builder().row(row).build();
queue.extend(flow.interflows_id());
dep.flows.insert(id, flow);
}
Expand All @@ -264,7 +265,10 @@ impl FlowSet {
Ok(FlowSet { flows: dep, ctx })
}

pub async fn start(self, options: StartFlowDeploymentOptions) {
pub async fn start(
self,
options: StartFlowDeploymentOptions,
) -> Result<(FlowRunId, JoinHandle<FlowRunResult>), new_flow_run::Error> {
let flow_id = self.flows.entrypoint;
let flow = self.flows.flows.get(&flow_id).unwrap().clone();
let registry = FlowRegistry::builder()
Expand All @@ -276,10 +280,10 @@ impl FlowSet {
.collect(),
))
.flow_owner(User {
id: self.flows.owner,
id: self.flows.user_id,
})
.started_by(User {
id: self.flows.owner,
id: self.flows.user_id,
})
.shared_with(Vec::new())
.signers_info(JsonValue::Null)
Expand All @@ -300,7 +304,7 @@ impl FlowSet {
options.inputs,
StartFlowOptions {
partial_config: None,
collect_instructions: flow.output_instructions,
collect_instructions: false,
action_identity: None,
action_config: None,
fees: Vec::new(),
Expand All @@ -312,7 +316,7 @@ impl FlowSet {
parent_flow_execute: None,
},
)
.await;
.await
}
}

Expand Down
30 changes: 30 additions & 0 deletions docker/supabase/migrations/20241214133549_flow_deployment.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
CREATE TABLE flow_deployments (
user_id UUID NOT NULL,
id UUID NOT NULL,
entrypoint INTEGER NOT NULL,
start_permission TEXT NOT NULL
);

-- All wallets used in the deployment
CREATE TABLE flow_deployments_wallets (
user_id UUID NOT NULL,
deployment_id UUID NOT NULL,
wallet_id BIGINT NOT NULL
);

-- All flows used in the deployment
CREATE TABLE flow_deployments_flows (
user_id UUID NOT NULL,
deployment_id UUID NOT NULL,
flow_id INTEGER NOT NULL,
nodes JSONB [] NOT NULL,
edges JSONB [] NOT NULL,
environment JSONB NOT NULL,
current_network JSONB NOT NULL,
instructions_bundling JSONB NOT NULL,
is_public BOOLEAN NOT NULL,
start_shared BOOLEAN NOT NULL,
start_unverified BOOLEAN NOT NULL
);

CREATE TABLE flow_deployments_tags (flow_id INTEGER, tag TEXT, deployment_id UUID);
3 changes: 2 additions & 1 deletion lib/flow-lib/src/config/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ fn default_interflow_instruction_info() -> Result<InstructionInfo, String> {
Err("not available".to_string())
}

/// A row of `flows` table
/// A row of `flows` table and `flow_deployments_flows`
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FlowRow {
pub id: FlowId,
pub user_id: UserId,
pub nodes: Vec<Node>,
pub edges: Vec<Edge>,
// TODO: remove default
#[serde(default)]
#[serde_as(deserialize_as = "serde_with::DefaultOnNull")]
pub environment: HashMap<String, String>,
Expand Down

0 comments on commit b4c529a

Please sign in to comment.