diff --git a/crates/flow-server/src/db_worker/flow_run_worker.rs b/crates/flow-server/src/db_worker/flow_run_worker.rs index 1b20b718..2f0f362a 100644 --- a/crates/flow-server/src/db_worker/flow_run_worker.rs +++ b/crates/flow-server/src/db_worker/flow_run_worker.rs @@ -103,7 +103,7 @@ impl actix::Handler for FlowRunWorker { type Result = Result<(SubscriptionID, Vec), SubscribeError>; fn handle(&mut self, msg: SubscribeEvents, _: &mut Self::Context) -> Self::Result { - if msg.user_id != self.user_id && self.shared_with.contains(&msg.user_id) { + if msg.user_id != self.user_id && !self.shared_with.contains(&msg.user_id) { return Err(SubscribeError::Unauthorized); } msg.receiver diff --git a/crates/flow-server/src/db_worker/user_worker.rs b/crates/flow-server/src/db_worker/user_worker.rs index 2657862a..fe9a0e26 100644 --- a/crates/flow-server/src/db_worker/user_worker.rs +++ b/crates/flow-server/src/db_worker/user_worker.rs @@ -234,37 +234,6 @@ impl actix::Handler for UserWorker { } } -struct ShareFlowRun { - worker: actix::Recipient, - flow_owner: UserId, - share_with: UserId, - db: DbPool, -} - -impl Actor for ShareFlowRun { - type Context = actix::Context; -} - -impl actix::Handler for ShareFlowRun { - type Result = ResponseFuture>; - fn handle(&mut self, msg: new_flow_run::Request, _: &mut Self::Context) -> Self::Result { - let worker = self.worker.clone(); - let flow_owner = self.flow_owner; - let db = self.db.clone(); - let share_with = self.share_with; - Box::pin(async move { - let res = worker.send(msg).await??; - db.get_user_conn(flow_owner) - .await - .map_err(new_flow_run::Error::other)? - .share_flow_run(res.flow_run_id, share_with) - .await - .map_err(new_flow_run::Error::other)?; - Ok(res) - }) - } -} - impl actix::Handler for UserWorker { type Result = ResponseFuture>; @@ -278,14 +247,23 @@ impl actix::Handler for UserWorker { return Err(new_flow_run::Error::Unauthorized); } - let run_id = db + let conn = db .get_user_conn(user_id) .await - .map_err(new_flow_run::Error::other)? + .map_err(new_flow_run::Error::other)?; + let run_id = conn .new_flow_run(&msg.config, &msg.inputs) .await .map_err(new_flow_run::Error::other)?; + for id in &msg.shared_with { + if *id != user_id { + conn.share_flow_run(run_id, *id) + .await + .map_err(new_flow_run::Error::other)?; + } + } + let actor = FlowRunWorker::new( run_id, user_id,