Skip to content

Commit

Permalink
Merge pull request #35 from zama-ai/davidk/prom-metrics
Browse files Browse the repository at this point in the history
Add prometheus instrumentation for coprocessor
  • Loading branch information
david-zk authored Sep 25, 2024
2 parents 87160fd + 6ba7227 commit d799643
Show file tree
Hide file tree
Showing 10 changed files with 542 additions and 21 deletions.
399 changes: 394 additions & 5 deletions fhevm-engine/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ sha3 = "0.10.8"
anyhow = "1.0.86"
daggy = "0.8.0"
serde = "1.0.210"
prometheus = "0.13.4"

[profile.dev.package.tfhe]
overflow-checks = false
Expand Down
2 changes: 2 additions & 0 deletions fhevm-engine/coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ fhevm-engine-common = { path = "../fhevm-engine-common" }
strum = { version = "0.26", features = ["derive"] }
bincode.workspace = true
sha3.workspace = true
prometheus.workspace = true
actix-web = "4.9.0"

[dev-dependencies]
testcontainers = "0.21"
Expand Down
10 changes: 6 additions & 4 deletions fhevm-engine/coprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

DB_URL ?= DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor

.PHONY: build
build:
cargo build
Expand All @@ -11,9 +13,9 @@ cleanup:
init_db:
docker compose up -d
sleep 3
DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor sqlx db create
DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor sqlx migrate run
DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor cargo test setup_test_user -- --nocapture --ignored
$(DB_URL) sqlx db create
$(DB_URL) sqlx migrate run
$(DB_URL) cargo test setup_test_user -- --nocapture --ignored

.PHONY: recreate_db
recreate_db:
Expand All @@ -23,4 +25,4 @@ recreate_db:
.PHONY: clean_run
clean_run:
$(MAKE) recreate_db
RUST_BACKTRACE=1 cargo run -- --run-server --run-bg-worker
RUST_BACKTRACE=1 $(DB_URL) cargo run --release -- --run-server --run-bg-worker
6 changes: 5 additions & 1 deletion fhevm-engine/coprocessor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Args {
pub tenant_key_cache_size: i32,

/// Maximum compact inputs to upload
#[arg(long, default_value_t = 8)]
#[arg(long, default_value_t = 10)]
pub maximimum_compact_inputs_upload: usize,

/// Maximum compact inputs to upload
Expand All @@ -55,6 +55,10 @@ pub struct Args {
#[arg(long, default_value = "127.0.0.1:50051")]
pub server_addr: String,

/// Prometheus metrics server address
#[arg(long, default_value = "0.0.0.0:9100")]
pub metrics_addr: String,

/// Postgres database url. If unspecified DATABASE_URL environment variable is used
#[arg(long)]
pub database_url: Option<String>,
Expand Down
6 changes: 6 additions & 0 deletions fhevm-engine/coprocessor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod tests;
mod tfhe_worker;
mod types;
mod utils;
mod metrics;

fn main() {
let args = crate::cli::parse_args();
Expand Down Expand Up @@ -70,6 +71,11 @@ async fn async_main(
set.spawn(crate::tfhe_worker::run_tfhe_worker(args.clone()));
}

if !args.metrics_addr.is_empty() {
println!("Initializing metrics server");
set.spawn(crate::metrics::run_metrics_server(args.clone()));
}

if set.is_empty() {
panic!("No tasks specified to run");
}
Expand Down
27 changes: 27 additions & 0 deletions fhevm-engine/coprocessor/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
async fn metrics() -> impl actix_web::Responder {
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode_to_string(&metric_families).expect("can't encode metrics")
}

async fn healthcheck() -> impl actix_web::Responder {
"OK"
}

pub async fn run_metrics_server(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("metrics server listening at {}", args.metrics_addr);
let _ = actix_web::HttpServer::new(|| {
actix_web::App::new()
.route("/metrics", actix_web::web::to(metrics))
.route("/health", actix_web::web::to(healthcheck))
})
.bind(&args.metrics_addr)
.expect("can't bind to metrics server address")
.workers(1)
.run()
.await?;

Ok(())
}
88 changes: 77 additions & 11 deletions fhevm-engine/coprocessor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use fhevm_engine_common::tfhe_ops::{
try_expand_ciphertext_list, validate_fhe_type,
};
use fhevm_engine_common::types::{FhevmError, SupportedFheCiphertexts, SupportedFheOperations};
use prometheus::{register_int_counter, IntCounter};
use sha3::{Digest, Keccak256};
use sqlx::{query, Acquire};
use tokio::task::spawn_blocking;
use tonic::transport::Server;
use lazy_static::lazy_static;

pub mod common {
tonic::include_proto!("fhevm.common");
Expand All @@ -34,6 +36,25 @@ pub mod coprocessor {
tonic::include_proto!("fhevm.coprocessor");
}

lazy_static! {
static ref UPLOAD_INPUTS_COUNTER: IntCounter =
register_int_counter!("coprocessor_upload_inputs_count", "grpc calls for inputs upload endpoint").unwrap();
static ref UPLOAD_INPUTS_ERRORS: IntCounter =
register_int_counter!("coprocessor_upload_inputs_errors", "grpc errors while calling upload inputs").unwrap();
static ref ASYNC_COMPUTE_COUNTER: IntCounter =
register_int_counter!("coprocessor_async_compute_count", "grpc calls for async compute endpoint").unwrap();
static ref ASYNC_COMPUTE_ERRORS: IntCounter =
register_int_counter!("coprocessor_async_compute_errors", "grpc errors while calling async compute").unwrap();
static ref TRIVIAL_ENCRYPT_COUNTER: IntCounter =
register_int_counter!("coprocessor_trivial_encrypt_count", "grpc calls for trivial encrypt endpoint").unwrap();
static ref TRIVIAL_ENCRYPT_ERRORS: IntCounter =
register_int_counter!("coprocessor_trivial_encrypt_errors", "grpc errors while calling trivial encrypt").unwrap();
static ref GET_CIPHERTEXTS_COUNTER: IntCounter =
register_int_counter!("coprocessor_get_ciphertexts_count", "grpc calls for get ciphertexts endpoint").unwrap();
static ref GET_CIPHERTEXTS_ERRORS: IntCounter =
register_int_counter!("coprocessor_get_ciphertexts_errors", "grpc errors while calling get ciphertexts").unwrap();
}

pub struct CoprocessorService {
pool: sqlx::Pool<sqlx::Postgres>,
args: crate::cli::Args,
Expand Down Expand Up @@ -140,6 +161,58 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
&self,
request: tonic::Request<InputUploadBatch>,
) -> std::result::Result<tonic::Response<InputUploadResponse>, tonic::Status> {
UPLOAD_INPUTS_COUNTER.inc();
self.upload_inputs_impl(request).await.inspect_err(|_| {
UPLOAD_INPUTS_ERRORS.inc();
})
}

async fn async_compute(
&self,
request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
ASYNC_COMPUTE_COUNTER.inc();
self.async_compute_impl(request).await.inspect_err(|_| {
ASYNC_COMPUTE_ERRORS.inc();
})
}

async fn wait_computations(
&self,
_request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::FhevmResponses>, tonic::Status> {
return Err(tonic::Status::unimplemented("not implemented"));
}

async fn trivial_encrypt_ciphertexts(
&self,
request: tonic::Request<coprocessor::TrivialEncryptBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
TRIVIAL_ENCRYPT_COUNTER.inc();
self.trivial_encrypt_ciphertexts_impl(request).await.inspect_err(|_| {
TRIVIAL_ENCRYPT_ERRORS.inc()
})
}

async fn get_ciphertexts(
&self,
request: tonic::Request<coprocessor::GetCiphertextBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GetCiphertextResponse>, tonic::Status>
{
GET_CIPHERTEXTS_COUNTER.inc();
self.get_ciphertexts_impl(request).await.inspect_err(|_| {
GET_CIPHERTEXTS_ERRORS.inc();
})
}
}

impl CoprocessorService {
async fn upload_inputs_impl(
&self,
request: tonic::Request<InputUploadBatch>,
) -> std::result::Result<tonic::Response<InputUploadResponse>, tonic::Status> {
UPLOAD_INPUTS_COUNTER.inc();

let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;

let req = request.get_ref();
Expand Down Expand Up @@ -400,7 +473,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
Ok(tonic::Response::new(response))
}

async fn async_compute(
async fn async_compute_impl(
&self,
request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
Expand Down Expand Up @@ -534,14 +607,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn wait_computations(
&self,
_request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::FhevmResponses>, tonic::Status> {
return Err(tonic::Status::unimplemented("not implemented"));
}

async fn trivial_encrypt_ciphertexts(
async fn trivial_encrypt_ciphertexts_impl(
&self,
request: tonic::Request<coprocessor::TrivialEncryptBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
Expand Down Expand Up @@ -616,7 +682,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn get_ciphertexts(
async fn get_ciphertexts_impl(
&self,
request: tonic::Request<coprocessor::GetCiphertextBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GetCiphertextResponse>, tonic::Status>
Expand Down Expand Up @@ -676,4 +742,4 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ

return Ok(tonic::Response::new(result));
}
}
}
1 change: 1 addition & 0 deletions fhevm-engine/coprocessor/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
tokio_threads: 2,
pg_pool_max_connections: 2,
server_addr: format!("127.0.0.1:{app_port}"),
metrics_addr: "".to_string(),
database_url: Some(db_url.to_string()),
maximimum_compact_inputs_upload: 10,
coprocessor_private_key: "./coprocessor.key".to_string(),
Expand Down
23 changes: 23 additions & 0 deletions fhevm-engine/coprocessor/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,36 @@ use fhevm_engine_common::{
tfhe_ops::{current_ciphertext_version, perform_fhe_operation},
types::SupportedFheOperations,
};
use prometheus::{register_int_counter, IntCounter};
use sqlx::{postgres::PgListener, query, Acquire};
use std::{
collections::{BTreeSet, HashMap},
num::NonZeroUsize,
};
use lazy_static::lazy_static;

lazy_static! {
static ref WORKER_ERRORS_COUNTER: IntCounter =
register_int_counter!("coprocessor_worker_errors", "worker errors encountered").unwrap();
static ref WORK_ITEMS_POLL_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_polls", "times work items are polled from database").unwrap();
static ref WORK_ITEMS_NOTIFICATIONS_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_notifications", "times instant notifications for work items received from the database").unwrap();
static ref WORK_ITEMS_FOUND_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_found", "work items queried from database").unwrap();
static ref WORK_ITEMS_ERRORS_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_errors", "work items errored out during computation").unwrap();
static ref WORK_ITEMS_PROCESSED_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_processed", "work items successfully processed and stored in the database").unwrap();
}

pub async fn run_tfhe_worker(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
// here we log the errors and make sure we retry
if let Err(cycle_error) = tfhe_worker_cycle(&args).await {
WORKER_ERRORS_COUNTER.inc();
eprintln!(
"Error in background worker, retrying shortly: {:?}",
cycle_error
Expand Down Expand Up @@ -49,9 +67,11 @@ async fn tfhe_worker_cycle(
if !immedially_poll_more_work {
tokio::select! {
_ = listener.try_recv() => {
WORK_ITEMS_NOTIFICATIONS_COUNTER.inc();
println!("Received work_available notification from postgres");
},
_ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => {
WORK_ITEMS_POLL_COUNTER.inc();
println!("Polling the database for more work on timer");
},
};
Expand Down Expand Up @@ -93,6 +113,7 @@ async fn tfhe_worker_cycle(
continue;
}

WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64);
println!("Processing {} work items", the_work.len());

// make sure we process each tenant sequentially not to
Expand Down Expand Up @@ -248,8 +269,10 @@ async fn tfhe_worker_cycle(
)
.execute(trx.as_mut())
.await?;
WORK_ITEMS_PROCESSED_COUNTER.inc();
}
Err((err, tenant_id, output_handle)) => {
WORKER_ERRORS_COUNTER.inc();
let _ = query!(
"
UPDATE computations
Expand Down

0 comments on commit d799643

Please sign in to comment.