diff --git a/fhevm-engine/coprocessor/src/tfhe_worker.rs b/fhevm-engine/coprocessor/src/tfhe_worker.rs index ab8d5842..61acc5bc 100644 --- a/fhevm-engine/coprocessor/src/tfhe_worker.rs +++ b/fhevm-engine/coprocessor/src/tfhe_worker.rs @@ -1,9 +1,6 @@ use crate::{db_queries::populate_cache_with_tenant_keys, types::TfheTenantKeys}; use fhevm_engine_common::types::{FhevmError, Handle, SupportedFheCiphertexts}; -use fhevm_engine_common::{ - tfhe_ops::{current_ciphertext_version, perform_fhe_operation}, - types::SupportedFheOperations, -}; +use fhevm_engine_common::{tfhe_ops::current_ciphertext_version, types::SupportedFheOperations}; use itertools::Itertools; use lazy_static::lazy_static; use opentelemetry::trace::{Span, TraceContextExt, Tracer}; @@ -102,10 +99,9 @@ async fn tfhe_worker_cycle( let mut s = tracer.start_with_context("begin_transaction", &loop_ctx); let mut trx = conn.begin().await?; s.end(); - // This query locks our work items so other worker doesn't select them. let mut s = tracer.start_with_context("query_work_items", &loop_ctx); - let mut the_work = query!( + let the_work = query!( " WITH RECURSIVE dependent_computations(tenant_id, output_handle, dependencies, fhe_operation, is_scalar, produced_handles) AS ( SELECT c.tenant_id, c.output_handle, c.dependencies, c.fhe_operation, c.is_scalar, ARRAY[ROW(c.tenant_id, c.output_handle)] @@ -169,7 +165,6 @@ async fn tfhe_worker_cycle( if the_work.is_empty() { continue; } - WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64); info!(target: "tfhe_worker", { count = the_work.len() }, "Processing work items"); // Make sure we process each tenant independently to avoid @@ -184,12 +179,12 @@ async fn tfhe_worker_cycle( let key_cache = tenant_key_cache.read().await; for (tenant_id, work) in work_by_tenant.iter() { let _ = tenants_to_query.insert(*tenant_id); - if !key_cache.contains(&tenant_id) { + if !key_cache.contains(tenant_id) { let _ = keys_to_query.insert(*tenant_id); } for w in work.iter() { for dh in &w.dependencies { - let _ = cts_to_query.insert(&dh); + let _ = cts_to_query.insert(dh); } } } @@ -207,7 +202,6 @@ async fn tfhe_worker_cycle( )); populate_cache_with_tenant_keys(keys_to_query, trx.as_mut(), &tenant_key_cache).await?; s.end(); - let mut s = tracer.start_with_context("query_ciphertext_batch", &loop_ctx); s.set_attribute(KeyValue::new("cts_to_query", cts_to_query.len() as i64)); // TODO: select all the ciphertexts where they're contained in the tuples @@ -224,7 +218,6 @@ async fn tfhe_worker_cycle( .fetch_all(trx.as_mut()) .await?; s.end(); - // index ciphertexts in hashmap let mut ciphertext_map: HashMap<(i32, &[u8]), _> = HashMap::with_capacity(ciphertexts_rows.len()); @@ -232,13 +225,11 @@ async fn tfhe_worker_cycle( let _ = ciphertext_map.insert((row.tenant_id, &row.handle), row); } - // TODO-ap - let mut s = tracer.start_with_context("schedule_fhe_work", &loop_ctx); - s.set_attribute(KeyValue::new("work_items", work_by_tenant.len() as i64)); - - let mut s_outer = tracer.start_with_context("wait_and_update_fhe_work", &loop_ctx); // Process tenants in sequence to avoid switching keys during execution for (tenant_id, work) in work_by_tenant.iter() { + let mut s_schedule = tracer.start_with_context("schedule_fhe_work", &loop_ctx); + s_schedule.set_attribute(KeyValue::new("work_items", work.len() as i64)); + s_schedule.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); // We need to ensure that no handles are missing from // either DB inputs or values produced within this batch // before this batch is scheduled. @@ -282,6 +273,7 @@ async fn tfhe_worker_cycle( let mut producer_indexes: HashMap<&Handle, usize> = HashMap::new(); let mut consumer_indexes: HashMap = HashMap::new(); 'work_items: for (widx, w) in work.iter().enumerate() { + let mut s = tracer.start_with_context("tfhe_computation", &loop_ctx); let fhe_op: SupportedFheOperations = w .fhe_operation .try_into() @@ -305,16 +297,11 @@ async fn tfhe_worker_cycle( } else { // If this cannot be computed, we need to // exclude it from the DF graph. - println!("Uncomputable found"); uncomputable.insert(widx, ()); continue 'work_items; } } - // copy for setting error in database - let mut s = tracer.start_with_context("tfhe_computation", &loop_ctx); - - // TODO-ap let n = graph.add_node( w.output_handle.clone(), w.fhe_operation.into(), @@ -328,7 +315,6 @@ async fn tfhe_worker_cycle( "handle", format!("0x{}", hex::encode(&w.output_handle)), )); - //s.set_attribute(KeyValue::new("output_type", db_type as i64)); let input_types = input_ciphertexts .iter() .map(|i| match i { @@ -362,34 +348,23 @@ async fn tfhe_worker_cycle( } } } + s_schedule.end(); // Execute the DFG with the current tenant's keys + let mut s_outer = tracer.start_with_context("wait_and_update_fhe_work", &loop_ctx); { let mut rk = tenant_key_cache.write().await; let keys = rk.get(tenant_id).expect("Can't get tenant key from cache"); - // TODO-ap // Schedule computations in parallel as dependences allow let mut sched = Scheduler::new(&mut graph.graph, args.coprocessor_fhe_threads); - let now = std::time::SystemTime::now(); - sched.schedule(keys.sks.clone()).await.map_err(|_| { - let err: Box = - Box::new(FhevmError::BadInputs); - error!(target: "tfhe_worker", - { error = err }, - "error while processing work item" - ) - }); - println!( - "GRAPH Execution time (sched): {}", - now.elapsed().unwrap().as_millis() - ); + sched.schedule(keys.sks.clone()).await?; } // Extract the results from the graph let res = graph.get_results().unwrap(); - // TODO-ap filter out computations that could not complete for (idx, w) in work.iter().enumerate() { + // Filter out computations that could not complete if uncomputable.contains_key(&idx) { continue; } @@ -478,9 +453,9 @@ async fn tfhe_worker_cycle( } } } + s_outer.end(); } s.end(); - s_outer.end(); trx.commit().await?; diff --git a/fhevm-engine/coprocessor/src/types.rs b/fhevm-engine/coprocessor/src/types.rs index 66994065..9bc7e42a 100644 --- a/fhevm-engine/coprocessor/src/types.rs +++ b/fhevm-engine/coprocessor/src/types.rs @@ -1,10 +1,12 @@ use std::sync::Arc; use fhevm_engine_common::types::FhevmError; +use scheduler::dfg::types::SchedulerError; #[derive(Debug)] pub enum CoprocessorError { DbError(sqlx::Error), + SchedulerError(SchedulerError), Unauthorized, FhevmError(FhevmError), DuplicateOutputHandleInBatch(String), @@ -60,6 +62,9 @@ impl std::fmt::Display for CoprocessorError { Self::DbError(dbe) => { write!(f, "Coprocessor db error: {:?}", dbe) } + Self::SchedulerError(se) => { + write!(f, "Coprocessor scheduler error: {:?}", se) + } Self::Unauthorized => { write!(f, "API key unknown/invalid/not provided") } @@ -169,6 +174,12 @@ impl From for CoprocessorError { } } +impl From for CoprocessorError { + fn from(err: SchedulerError) -> Self { + CoprocessorError::SchedulerError(err) + } +} + impl From for tonic::Status { fn from(err: CoprocessorError) -> Self { tonic::Status::from_error(Box::new(err)) diff --git a/fhevm-engine/executor/src/server.rs b/fhevm-engine/executor/src/server.rs index 5fdb1a9e..68aa2699 100644 --- a/fhevm-engine/executor/src/server.rs +++ b/fhevm-engine/executor/src/server.rs @@ -335,9 +335,9 @@ pub fn run_computation( } } -pub fn build_taskgraph_from_request<'a, 'b>( - dfg: &'a mut DFGraph<'b>, - req: &'b SyncComputeRequest, +pub fn build_taskgraph_from_request( + dfg: &mut DFGraph, + req: &SyncComputeRequest, state: &ComputationState, ) -> Result<(), SyncComputeError> { let mut produced_handles: HashMap<&Handle, usize> = HashMap::new(); diff --git a/fhevm-engine/scheduler/src/dfg.rs b/fhevm-engine/scheduler/src/dfg.rs index b22c6435..e0c5235b 100644 --- a/fhevm-engine/scheduler/src/dfg.rs +++ b/fhevm-engine/scheduler/src/dfg.rs @@ -3,16 +3,9 @@ pub mod types; use crate::dfg::types::*; use anyhow::Result; -use std::{cell::RefCell, collections::HashMap}; -// use executor::server::{ -// CompressedCiphertext, ComputationState, Input, SyncComputation, SyncComputeError, -// SyncComputeRequest, -// }; use daggy::{petgraph::graph::node_index, Dag, NodeIndex}; -use fhevm_engine_common::types::{ - FhevmError, Handle, SupportedFheCiphertexts, HANDLE_LEN, SCALAR_LEN, -}; -use tfhe::integer::U256; +use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts}; +use std::cell::RefCell; thread_local! { pub static THREAD_POOL: RefCell> = const {RefCell::new(None)}; @@ -31,26 +24,25 @@ impl std::fmt::Debug for OpNode { f.debug_struct("OpNode") .field("OP", &self.opcode) .field( - "Result", - &format_args!("{0:?} (0x{0:X})", &self.result_handle[0]), + "Result handle", + &format_args!("{:02X?}", &self.result_handle), ) .finish() } } #[derive(Default, Debug)] -pub struct DFGraph<'a> { +pub struct DFGraph { pub graph: Dag, - produced_handles: HashMap<&'a Handle, NodeIndex>, } -impl<'a> DFGraph<'a> { +impl DFGraph { pub fn add_node( &mut self, rh: Handle, opcode: i32, inputs: Vec, - ) -> Result { + ) -> Result { Ok(self.graph.add_node(OpNode { opcode, result: None, @@ -63,7 +55,7 @@ impl<'a> DFGraph<'a> { source: usize, destination: usize, consumer_input: usize, - ) -> Result<(), SchedulerError> { + ) -> Result<()> { let consumer_index = node_index(destination); self.graph[consumer_index].inputs[consumer_input] = DFGTaskInput::Dependence(Some(source)); let _edge = self @@ -73,20 +65,20 @@ impl<'a> DFGraph<'a> { node_index(destination), consumer_input as u8, ) - .map_err(|_| SchedulerError::SchedulerError)?; + .map_err(|_| SchedulerError::CyclicDependence)?; Ok(()) } pub fn get_results( &mut self, - ) -> Result))>, SchedulerError> { + ) -> Result))>> { let mut res = Vec::with_capacity(self.graph.node_count()); for index in 0..self.graph.node_count() { let node = self.graph.node_weight_mut(NodeIndex::new(index)).unwrap(); if let Some(ct) = &node.result { res.push((node.result_handle.clone(), ct.clone())); } else { - return Err(SchedulerError::SchedulerError); + return Err(SchedulerError::DataflowGraphError.into()); } } Ok(res) diff --git a/fhevm-engine/scheduler/src/dfg/scheduler.rs b/fhevm-engine/scheduler/src/dfg/scheduler.rs index 8ed879d7..79330481 100644 --- a/fhevm-engine/scheduler/src/dfg/scheduler.rs +++ b/fhevm-engine/scheduler/src/dfg/scheduler.rs @@ -1,32 +1,26 @@ -use std::borrow::Borrow; -use std::collections::HashMap; -use std::sync::atomic::AtomicUsize; - -use crate::dfg::types::*; -use crate::dfg::{OpEdge, OpNode, THREAD_POOL}; +use crate::dfg::{types::*, OpEdge, OpNode, THREAD_POOL}; use anyhow::Result; -use daggy::petgraph::csr::IndexType; -use daggy::petgraph::graph::node_index; -use daggy::petgraph::visit::{IntoEdgeReferences, IntoNeighbors, VisitMap, Visitable}; -use daggy::petgraph::Direction::Incoming; - -use fhevm_engine_common::{ - common::FheOperation, - keys::{FhevmKeys, SerializedFhevmKeys}, - tfhe_ops::{current_ciphertext_version, perform_fhe_operation, try_expand_ciphertext_list}, - types::{get_ct_type, FhevmError, Handle, SupportedFheCiphertexts, HANDLE_LEN, SCALAR_LEN}, -}; - -use rayon::prelude::*; -use std::sync::mpsc::channel; - use daggy::{ petgraph::{ - visit::{EdgeRef, IntoEdgesDirected}, + csr::IndexType, + graph::node_index, + visit::{ + EdgeRef, IntoEdgeReferences, IntoEdgesDirected, IntoNeighbors, VisitMap, Visitable, + }, Direction, + Direction::Incoming, }, Dag, NodeIndex, }; +use fhevm_engine_common::{ + common::FheOperation, tfhe_ops::perform_fhe_operation, types::SupportedFheCiphertexts, +}; +use rayon::prelude::*; +use std::{ + borrow::Borrow, + collections::HashMap, + sync::{atomic::AtomicUsize, mpsc::channel}, +}; use tokio::task::JoinSet; struct ExecNode { @@ -83,7 +77,7 @@ impl<'a> Scheduler<'a> { } } - pub async fn schedule(&mut self, server_key: tfhe::ServerKey) -> Result<(), SchedulerError> { + pub async fn schedule(&mut self, server_key: tfhe::ServerKey) -> Result<()> { let schedule_type = std::env::var("FHEVM_DF_SCHEDULE"); match schedule_type { Ok(val) => match val.as_str() { @@ -103,13 +97,9 @@ impl<'a> Scheduler<'a> { } } - async fn schedule_fine_grain( - &mut self, - server_key: tfhe::ServerKey, - ) -> Result<(), SchedulerError> { - let mut set: JoinSet< - Result<(usize, (SupportedFheCiphertexts, i16, Vec)), SchedulerError>, - > = JoinSet::new(); + async fn schedule_fine_grain(&mut self, server_key: tfhe::ServerKey) -> Result<()> { + let mut set: JoinSet))>> = + JoinSet::new(); tfhe::set_server_key(server_key.clone()); // Prime the scheduler with all nodes without dependences for idx in 0..self.graph.node_count() { @@ -118,17 +108,15 @@ impl<'a> Scheduler<'a> { let node = self.graph.node_weight_mut(index).unwrap(); if Self::is_ready(node) { let opcode = node.opcode; - let inputs: Result, SchedulerError> = node + let inputs: Result> = node .inputs .iter() .map(|i| match i { DFGTaskInput::Value(i) => Ok(i.clone()), DFGTaskInput::Compressed((t, c)) => { - Ok(SupportedFheCiphertexts::decompress(*t, c) - .map_err(|_| SchedulerError::SchedulerError)?) + SupportedFheCiphertexts::decompress(*t, c) } - - _ => Err(SchedulerError::SchedulerError), + _ => Err(SchedulerError::UnsatisfiedDependence.into()), }) .collect(); set.spawn_blocking(move || { @@ -139,7 +127,7 @@ impl<'a> Scheduler<'a> { } // Get results from computations and update dependences of remaining computations while let Some(result) = set.join_next().await { - let output = result.map_err(|_| SchedulerError::SchedulerError)??; + let output = result??; let index = output.0; let node_index = NodeIndex::new(index); // Satisfy deps from the executed task @@ -151,16 +139,15 @@ impl<'a> Scheduler<'a> { DFGTaskInput::Value(output.1 .0.clone()); if Self::is_ready(child_node) { let opcode = child_node.opcode; - let inputs: Result, SchedulerError> = child_node + let inputs: Result> = child_node .inputs .iter() .map(|i| match i { DFGTaskInput::Value(i) => Ok(i.clone()), DFGTaskInput::Compressed((t, c)) => { - Ok(SupportedFheCiphertexts::decompress(*t, c) - .map_err(|_| SchedulerError::SchedulerError)?) + SupportedFheCiphertexts::decompress(*t, c) } - _ => Err(SchedulerError::SchedulerError), + _ => Err(SchedulerError::UnsatisfiedDependence.into()), }) .collect(); set.spawn_blocking(move || { @@ -178,16 +165,13 @@ impl<'a> Scheduler<'a> { &mut self, strategy: PartitionStrategy, server_key: tfhe::ServerKey, - ) -> Result<(), SchedulerError> { + ) -> Result<()> { tfhe::set_server_key(server_key.clone()); let mut set: JoinSet< - Result< - ( - Vec<(usize, (SupportedFheCiphertexts, i16, Vec))>, - NodeIndex, - ), - SchedulerError, - >, + Result<( + Vec<(usize, (SupportedFheCiphertexts, i16, Vec))>, + NodeIndex, + )>, > = JoinSet::new(); let mut execution_graph: Dag = Dag::default(); let _ = match strategy { @@ -221,7 +205,7 @@ impl<'a> Scheduler<'a> { } // Get results from computations and update dependences of remaining computations while let Some(result) = set.join_next().await { - let mut output = result.map_err(|_| SchedulerError::SchedulerError)??; + let mut output = result??; let task_index = output.1; while let Some(o) = output.0.pop() { let index = o.0; @@ -264,10 +248,7 @@ impl<'a> Scheduler<'a> { Ok(()) } - async fn schedule_component_loop( - &mut self, - server_key: tfhe::ServerKey, - ) -> Result<(), SchedulerError> { + async fn schedule_component_loop(&mut self, server_key: tfhe::ServerKey) -> Result<()> { let mut execution_graph: Dag = Dag::default(); let _ = partition_components(self.graph, &mut execution_graph); let mut comps = vec![]; @@ -304,7 +285,7 @@ impl<'a> Scheduler<'a> { }); let results: Vec<_> = dest.iter().collect(); for result in results { - let mut output = result.map_err(|_| SchedulerError::SchedulerError)?; + let mut output = result?; while let Some(o) = output.0.pop() { let index = o.0; let node_index = NodeIndex::new(index); @@ -319,17 +300,17 @@ fn add_execution_depedences( graph: &Dag, execution_graph: &mut Dag, node_map: HashMap, -) -> Result<(), SchedulerError> { +) -> Result<()> { // Once the DFG is partitioned, we need to add dependences as // edges in the execution graph for edge in graph.edge_references() { let (xsrc, xdst) = ( node_map .get(&edge.source()) - .ok_or(SchedulerError::SchedulerError)?, + .ok_or(SchedulerError::DataflowGraphError)?, node_map .get(&edge.target()) - .ok_or(SchedulerError::SchedulerError)?, + .ok_or(SchedulerError::DataflowGraphError)?, ); if xsrc != xdst && execution_graph.find_edge(*xsrc, *xdst).is_none() { let _ = execution_graph.add_edge(*xsrc, *xdst, ()); @@ -349,10 +330,10 @@ fn add_execution_depedences( fn partition_preserving_parallelism( graph: &Dag, execution_graph: &mut Dag, -) -> Result<(), SchedulerError> { +) -> Result<()> { // First sort the DAG in a schedulable order - let ts = - daggy::petgraph::algo::toposort(graph, None).map_err(|_| SchedulerError::SchedulerError)?; + let ts = daggy::petgraph::algo::toposort(graph, None) + .map_err(|_| SchedulerError::CyclicDependence)?; let mut vis = graph.visit_map(); let mut node_map = HashMap::new(); // Traverse the DAG and build a graph of connected components @@ -391,10 +372,10 @@ fn partition_preserving_parallelism( fn partition_components( graph: &Dag, execution_graph: &mut Dag, -) -> Result<(), SchedulerError> { +) -> Result<()> { // First sort the DAG in a schedulable order - let ts = - daggy::petgraph::algo::toposort(graph, None).map_err(|_| SchedulerError::SchedulerError)?; + let ts = daggy::petgraph::algo::toposort(graph, None) + .map_err(|_| SchedulerError::CyclicDependence)?; let tsmap: HashMap<&NodeIndex, usize> = ts.iter().enumerate().map(|(c, x)| (x, c)).collect(); let mut vis = graph.visit_map(); // Traverse the DAG and build a graph of the connected components @@ -430,19 +411,16 @@ fn partition_components( Ok(()) } -pub fn execute_partition( +fn execute_partition( computations: Vec<(i32, Vec, NodeIndex)>, task_id: NodeIndex, use_global_threadpool: bool, rayon_threads: usize, server_key: tfhe::ServerKey, -) -> Result< - ( - Vec<(usize, (SupportedFheCiphertexts, i16, Vec))>, - NodeIndex, - ), - SchedulerError, -> { +) -> Result<( + Vec<(usize, (SupportedFheCiphertexts, i16, Vec))>, + NodeIndex, +)> { let mut res: HashMap)> = HashMap::with_capacity(computations.len()); for (opcode, inputs, nidx) in computations { @@ -455,17 +433,14 @@ pub fn execute_partition( cts.push(ct.0.clone()); } } else { - return Err(SchedulerError::SchedulerError); + return Err(SchedulerError::UnsatisfiedDependence.into()); } } DFGTaskInput::Value(v) => { cts.push(v.clone()); } DFGTaskInput::Compressed((t, c)) => { - cts.push( - SupportedFheCiphertexts::decompress(*t, c) - .map_err(|_| SchedulerError::SchedulerError)?, - ); + cts.push(SupportedFheCiphertexts::decompress(*t, c)?); } } } @@ -488,7 +463,7 @@ pub fn execute_partition( thread_pool.broadcast(|_| { tfhe::set_server_key(server_key.clone()); }); - thread_pool.install(|| -> Result<(), SchedulerError> { + thread_pool.install(|| -> Result<()> { let (node_index, result) = run_computation(opcode, Ok(cts), nidx.index())?; res.insert(node_index, result); Ok(()) @@ -501,9 +476,9 @@ pub fn execute_partition( fn run_computation( operation: i32, - inputs: Result, SchedulerError>, + inputs: Result>, graph_node_index: usize, -) -> Result<(usize, (SupportedFheCiphertexts, i16, Vec)), SchedulerError> { +) -> Result<(usize, (SupportedFheCiphertexts, i16, Vec))> { let op = FheOperation::try_from(operation); match inputs { Ok(inputs) => match op { @@ -516,14 +491,10 @@ fn run_computation( let (ct_type, ct_bytes) = result.compress(); Ok((graph_node_index, (result.clone(), ct_type, ct_bytes))) } - Err(_) => Err::<(usize, (SupportedFheCiphertexts, i16, Vec)), SchedulerError>( - SchedulerError::SchedulerError, - ), + Err(e) => Err(e.into()), }, - _ => Err::<(usize, (SupportedFheCiphertexts, i16, Vec)), SchedulerError>( - SchedulerError::SchedulerError, - ), + _ => Err(SchedulerError::UnknownOperation(operation).into()), }, - Err(_) => Err(SchedulerError::SchedulerError), + Err(_) => Err(SchedulerError::InvalidInputs.into()), } } diff --git a/fhevm-engine/scheduler/src/dfg/types.rs b/fhevm-engine/scheduler/src/dfg/types.rs index 393729f8..23706de9 100644 --- a/fhevm-engine/scheduler/src/dfg/types.rs +++ b/fhevm-engine/scheduler/src/dfg/types.rs @@ -11,19 +11,40 @@ pub enum DFGTaskInput { #[derive(Debug)] pub enum SchedulerError { + UnsatisfiedDependence, + CyclicDependence, + DataflowGraphError, + UnknownOperation(i32), + InvalidInputs, SchedulerError, } -impl std::error::Error for SchedulerError { - fn description(&self) -> &str { - match self { - SchedulerError::SchedulerError => "Generic scheduler error", - } - } -} +impl std::error::Error for SchedulerError {} impl std::fmt::Display for SchedulerError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "SchedulerError") + match self { + Self::UnsatisfiedDependence => { + write!( + f, + "Unsatisfied depence in dataflow graph at scheduling time" + ) + } + Self::CyclicDependence => { + write!(f, "Depence cycle in dataflow graph") + } + Self::DataflowGraphError => { + write!(f, "Inconsistent dataflow graph error") + } + Self::UnknownOperation(op) => { + write!(f, "Unknown operation with code: {op}") + } + Self::InvalidInputs => { + write!(f, "Invalid inputs to FHE operation") + } + Self::SchedulerError => { + write!(f, "Generic scheduler error") + } + } } }