Skip to content

Commit

Permalink
Merge pull request #111 from zama-ai/davidk/tenant-tooling
Browse files Browse the repository at this point in the history
feat: add coprocessor cli tooling
  • Loading branch information
david-zk authored Nov 7, 2024
2 parents 1e455b9 + 0c11f9a commit cf4506b
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 41 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions fhevm-engine/coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "coprocessor"
version = "0.1.0"
edition = "2021"
default-run = "coprocessor"

[dependencies]
# workspace dependencies
Expand Down Expand Up @@ -52,3 +53,11 @@ testcontainers = "0.21"

[build-dependencies]
tonic-build = "0.12"

[[bin]]
name = "coprocessor"
path = "src/bin/coprocessor.rs"

[[bin]]
name = "cli"
path = "src/bin/cli.rs"
5 changes: 4 additions & 1 deletion fhevm-engine/coprocessor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ FROM debian:bullseye-slim
RUN useradd -m zama

COPY --from=build /app/fhevm-engine/target/release/coprocessor /usr/local/bin/
COPY --from=build /app/fhevm-engine/target/release/cli /usr/local/bin/coprocessor-cli

RUN chown zama:zama /usr/local/bin/coprocessor && \
chmod 500 /usr/local/bin/coprocessor
RUN chown zama:zama /usr/local/bin/coprocessor-cli && \
chmod 500 /usr/local/bin/coprocessor-cli

USER zama

ENTRYPOINT ["/usr/local/bin/coprocessor"]
ENTRYPOINT ["/usr/local/bin/coprocessor"]
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ CREATE TABLE IF NOT EXISTS tenants (
CREATE INDEX IF NOT EXISTS computations_dependencies_index ON computations USING GIN (dependencies);
CREATE INDEX IF NOT EXISTS computations_completed_index ON computations (is_completed);
CREATE INDEX IF NOT EXISTS computations_errors_index ON computations (is_error);
CREATE INDEX IF NOT EXISTS tenants_by_api_key ON tenants (tenant_api_key);
CREATE UNIQUE INDEX IF NOT EXISTS tenants_by_api_key ON tenants (tenant_api_key);
99 changes: 99 additions & 0 deletions fhevm-engine/coprocessor/src/bin/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::str::FromStr;

use clap::Parser;
use sqlx::types::Uuid;

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub enum Args {
/// Inserts tenant into specified database
InsertTenant {
/// PKS file path
#[arg(long)]
pks_file: String,
/// SKS file path
#[arg(long)]
sks_file: String,
/// Public params file path
#[arg(long)]
public_params_file: String,
/// Tenant api key
#[arg(long)]
tenant_api_key: String,
/// ACL contract address
#[arg(long)]
acl_contract_address: String,
/// Input verifier address
#[arg(long)]
verifying_contract_address: String,
/// Chain id
#[arg(long)]
chain_id: u32,
},
}

fn main() {
let args = Args::parse();
match args {
Args::InsertTenant { pks_file, sks_file, public_params_file, tenant_api_key, acl_contract_address, verifying_contract_address, chain_id } => {
let db_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL environment variable is undefined");
let pks_file = std::fs::read(&pks_file)
.expect("Can't read pks file");
let sks_file = std::fs::read(&sks_file)
.expect("Can't read pks file");
let public_params_file = std::fs::read(&public_params_file)
.expect("Can't read public params file");
let _ = alloy::primitives::Address::from_str(&acl_contract_address)
.expect("Can't parse acl contract adddress");
let _ = alloy::primitives::Address::from_str(&verifying_contract_address)
.expect("Can't parse input verifier adddress");
let tenant_api_key = Uuid::from_str(&tenant_api_key).expect("Can't parse tenant api key");

tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&db_url)
.await.expect("Can't connect to postgres instance");


sqlx::query!(
"
INSERT INTO tenants(
tenant_api_key,
chain_id,
acl_contract_address,
verifying_contract_address,
pks_key,
sks_key,
public_params
)
VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7
)
",
tenant_api_key,
chain_id as i32,
&acl_contract_address,
&verifying_contract_address,
&pks_file,
&sks_file,
&public_params_file
)
.execute(&pool)
.await
.expect("Can't insert new tenant");
});
},
}
}
13 changes: 13 additions & 0 deletions fhevm-engine/coprocessor/src/bin/coprocessor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
fn main() {
let args = coprocessor::daemon_cli::parse_args();
assert!(
args.work_items_batch_size < args.tenant_key_cache_size,
"Work items batch size must be less than tenant key cache size"
);

if args.generate_fhe_keys {
coprocessor::generate_dump_fhe_keys();
} else {
coprocessor::start_runtime(args, None);
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,37 +1,22 @@
use std::sync::Once;

use ::tracing::{error, info};
use fhevm_engine_common::keys::{FhevmKeys, SerializedFhevmKeys};
use tokio::task::JoinSet;
use ::tracing::{error, info};

mod cli;
pub mod daemon_cli;
mod db_queries;
mod metrics;
mod server;
pub mod metrics;
pub mod server;
#[cfg(test)]
mod tests;
mod tfhe_worker;
mod tracing;
pub mod tfhe_worker;
pub mod tracing;
mod types;
mod utils;

fn main() {
let args = crate::cli::parse_args();
assert!(
args.work_items_batch_size < args.tenant_key_cache_size,
"Work items batch size must be less than tenant key cache size"
);

if args.generate_fhe_keys {
generate_dump_fhe_keys();
} else {
start_runtime(args, None);
}
}

// separate function for testing
pub fn start_runtime(
args: crate::cli::Args,
args: daemon_cli::Args,
close_recv: Option<tokio::sync::watch::Receiver<bool>>,
) {
tokio::runtime::Builder::new_multi_thread()
Expand Down Expand Up @@ -64,8 +49,8 @@ pub fn start_runtime(
// Used for testing as we would call `async_main()` multiple times.
static TRACING_INIT: Once = Once::new();

async fn async_main(
args: crate::cli::Args,
pub async fn async_main(
args: daemon_cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
TRACING_INIT.call_once(|| {
tracing_subscriber::fmt().json().with_level(true).init();
Expand All @@ -78,17 +63,17 @@ async fn async_main(
let mut set = JoinSet::new();
if args.run_server {
info!(target: "async_main", "Initializing api server");
set.spawn(crate::server::run_server(args.clone()));
set.spawn(server::run_server(args.clone()));
}

if args.run_bg_worker {
info!(target: "async_main", "Initializing background worker");
set.spawn(crate::tfhe_worker::run_tfhe_worker(args.clone()));
set.spawn(tfhe_worker::run_tfhe_worker(args.clone()));
}

if !args.metrics_addr.is_empty() {
info!(target: "async_main", "Initializing metrics server");
set.spawn(crate::metrics::run_metrics_server(args.clone()));
set.spawn(metrics::run_metrics_server(args.clone()));
}

if set.is_empty() {
Expand All @@ -104,8 +89,8 @@ async fn async_main(
Ok(())
}

fn generate_dump_fhe_keys() {
pub fn generate_dump_fhe_keys() {
let keys = FhevmKeys::new();
let ser_keys: SerializedFhevmKeys = keys.into();
ser_keys.save_to_disk();
}
}
2 changes: 1 addition & 1 deletion fhevm-engine/coprocessor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async fn healthcheck() -> impl actix_web::Responder {
}

pub async fn run_metrics_server(
args: crate::cli::Args,
args: crate::daemon_cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("metrics server listening at {}", args.metrics_addr);
let _ = actix_web::HttpServer::new(|| {
Expand Down
6 changes: 3 additions & 3 deletions fhevm-engine/coprocessor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ lazy_static! {

struct CoprocessorService {
pool: sqlx::Pool<sqlx::Postgres>,
args: crate::cli::Args,
args: crate::daemon_cli::Args,
tenant_key_cache: std::sync::Arc<tokio::sync::RwLock<lru::LruCache<i32, TfheTenantKeys>>>,
signer: PrivateKeySigner,
get_ciphertext_eip712_domain: Eip712Domain,
}

pub async fn run_server(
args: crate::cli::Args,
args: crate::daemon_cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
if let Err(e) = run_server_iteration(args.clone()).await {
Expand All @@ -106,7 +106,7 @@ pub async fn run_server(
}

pub async fn run_server_iteration(
args: crate::cli::Args,
args: crate::daemon_cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = args
.server_addr
Expand Down
5 changes: 2 additions & 3 deletions fhevm-engine/coprocessor/src/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::Args;
use crate::daemon_cli::Args;
use fhevm_engine_common::tfhe_ops::current_ciphertext_version;
use fhevm_engine_common::types::SupportedFheCiphertexts;
use fhevm_engine_common::utils::{safe_deserialize, safe_deserialize_key};
Expand Down Expand Up @@ -214,10 +214,9 @@ pub async fn setup_test_user(pool: &sqlx::PgPool) -> Result<(), Box<dyn std::err
.expect("can't read public params");
sqlx::query!(
"
INSERT INTO tenants(tenant_api_key, tenant_id, chain_id, acl_contract_address, verifying_contract_address, pks_key, sks_key, public_params, cks_key)
INSERT INTO tenants(tenant_api_key, chain_id, acl_contract_address, verifying_contract_address, pks_key, sks_key, public_params, cks_key)
VALUES (
'a1503fb6-d79b-4e9e-826d-44cf262f3e05',
1,
12345,
'0x339EcE85B9E11a3A3AA557582784a15d7F82AAf2',
'0x69dE3158643e738a0724418b21a35FAA20CBb1c5',
Expand Down
4 changes: 2 additions & 2 deletions fhevm-engine/coprocessor/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ lazy_static! {
}

pub async fn run_tfhe_worker(
args: crate::cli::Args,
args: crate::daemon_cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
// here we log the errors and make sure we retry
Expand All @@ -59,7 +59,7 @@ pub async fn run_tfhe_worker(
}

async fn tfhe_worker_cycle(
args: &crate::cli::Args,
args: &crate::daemon_cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tracer = opentelemetry::global::tracer("tfhe_worker");

Expand Down
2 changes: 1 addition & 1 deletion fhevm-engine/coprocessor/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub fn sort_computations_by_dependencies<'a>(
Ok((res, handles_to_check_in_db))
}

pub fn db_url(args: &crate::cli::Args) -> String {
pub fn db_url(args: &crate::daemon_cli::Args) -> String {
if let Some(db_url) = &args.database_url {
return db_url.clone();
}
Expand Down

0 comments on commit cf4506b

Please sign in to comment.