From e1309284cdfd8533fb1a840992eac5a88142047b Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Fri, 22 Nov 2024 16:41:10 +0100 Subject: [PATCH] fix(prover): Create reqwest client only once Creating reqwest client is expensive because it initializes TLS, loads certificates, etc. So it should be create only once and reused. Create new internal mod http_client instead of patching zksync_utils because fn `send_request_with_retries` is used only in prover_autoscaler and outdated prover_gpu_fri, which will be removed soon. Additionally HttpClient exports metric `calls` with all the requests and correct status codes. ref ZKD-1855 --- .../prover_autoscaler/src/global/queuer.rs | 24 ++--- .../prover_autoscaler/src/global/watcher.rs | 37 ++++---- .../bin/prover_autoscaler/src/http_client.rs | 94 +++++++++++++++++++ .../bin/prover_autoscaler/src/k8s/watcher.rs | 15 ++- .../crates/bin/prover_autoscaler/src/lib.rs | 1 + .../crates/bin/prover_autoscaler/src/main.rs | 24 ++++- .../bin/prover_autoscaler/src/metrics.rs | 2 - 7 files changed, 151 insertions(+), 46 deletions(-) create mode 100644 prover/crates/bin/prover_autoscaler/src/http_client.rs diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index baeb5b70a4ef..bc781e793408 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -3,14 +3,8 @@ use std::{collections::HashMap, ops::Deref}; use anyhow::{Context, Ok}; use reqwest::Method; use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; -use zksync_utils::http_with_retries::send_request_with_retries; -use crate::{ - config::QueueReportFields, - metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, -}; - -const MAX_RETRIES: usize = 5; +use crate::{config::QueueReportFields, http_client::HttpClient}; pub struct Queue(HashMap<(String, QueueReportFields), u64>); @@ -23,6 +17,7 @@ impl Deref for Queue { #[derive(Default)] pub struct Queuer { + http_client: HttpClient, pub prover_job_monitor_url: String, } @@ -40,8 +35,9 @@ fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 { } impl Queuer { - pub fn new(pjm_url: String) -> Self { + pub fn new(http_client: HttpClient, pjm_url: String) -> Self { Self { + http_client, prover_job_monitor_url: pjm_url, } } @@ -50,13 +46,13 @@ impl Queuer { /// list of jobs. pub async fn get_queue(&self, jobs: &[QueueReportFields]) -> anyhow::Result { let url = &self.prover_job_monitor_url; - let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; - let response = response.map_err(|err| { - AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); - anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}") - })?; + let response = self + .http_client + .send_request_with_retries(url, Method::GET, None, None) + .await; + let response = response + .map_err(|err| anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}"))?; - AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc(); let response = response .json::>() .await diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index 95b9e32cac5b..9a56471b72d5 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -8,17 +8,14 @@ use reqwest::{ }; use tokio::sync::Mutex; use url::Url; -use zksync_utils::http_with_retries::send_request_with_retries; use crate::{ agent::{ScaleRequest, ScaleResponse}, cluster_types::{Cluster, Clusters}, - metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, + http_client::HttpClient, task_wiring::Task, }; -const MAX_RETRIES: usize = 5; - #[derive(Default)] pub struct WatchedData { pub clusters: Clusters, @@ -36,6 +33,7 @@ pub fn check_is_ready(v: &Vec) -> Result<()> { #[derive(Default, Clone)] pub struct Watcher { + http_client: HttpClient, /// List of base URLs of all agents. pub cluster_agents: Vec>, pub dry_run: bool, @@ -43,9 +41,10 @@ pub struct Watcher { } impl Watcher { - pub fn new(agent_urls: Vec, dry_run: bool) -> Self { + pub fn new(http_client: HttpClient, agent_urls: Vec, dry_run: bool) -> Self { let size = agent_urls.len(); Self { + http_client, cluster_agents: agent_urls .into_iter() .map(|u| { @@ -92,6 +91,7 @@ impl Watcher { .unwrap() .to_string(); tracing::debug!("Sending scale request to {}, data: {:?}.", url, sr); + let http_client = self.http_client.clone(); tokio::spawn(async move { let mut headers = HeaderMap::new(); headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); @@ -99,19 +99,17 @@ impl Watcher { tracing::info!("Dry-run mode, not sending the request."); return Ok((id, Ok(ScaleResponse::default()))); } - let response = send_request_with_retries( - &url, - MAX_RETRIES, - Method::POST, - Some(headers), - Some(serde_json::to_vec(&sr)?), - ) - .await; + let response = http_client + .send_request_with_retries( + &url, + Method::POST, + Some(headers), + Some(serde_json::to_vec(&sr)?), + ) + .await; let response = response.map_err(|err| { - AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })?; - AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); let response = response .json::() .await @@ -164,21 +162,20 @@ impl Task for Watcher { .enumerate() .map(|(i, a)| { tracing::debug!("Getting cluster data from agent {}.", a); + let http_client = self.http_client.clone(); tokio::spawn(async move { let url: String = a .clone() .join("/cluster") .context("Failed to join URL with /cluster")? .to_string(); - let response = - send_request_with_retries(&url, MAX_RETRIES, Method::GET, None, None).await; + let response = http_client + .send_request_with_retries(&url, Method::GET, None, None) + .await; let response = response.map_err(|err| { - // TODO: refactor send_request_with_retries to return status. - AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })?; - AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); let response = response .json::() .await diff --git a/prover/crates/bin/prover_autoscaler/src/http_client.rs b/prover/crates/bin/prover_autoscaler/src/http_client.rs new file mode 100644 index 000000000000..6710ea53a26d --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/http_client.rs @@ -0,0 +1,94 @@ +use reqwest::{header::HeaderMap, Client, Error, Method, Response, StatusCode}; +use tokio::time::{sleep, Duration}; + +use crate::metrics::AUTOSCALER_METRICS; + +#[derive(Clone)] +pub struct HttpClient { + client: Client, + max_retries: usize, +} + +impl Default for HttpClient { + fn default() -> Self { + Self { + client: Client::new(), + max_retries: 5, + } + } +} + +#[derive(Debug)] +pub enum HttpError { + ReqwestError(Error), + RetryExhausted(String), +} + +impl HttpClient { + /// Method to send HTTP request with fixed number of retires with exponential back-offs. + pub async fn send_request_with_retries( + &self, + url: &str, + method: Method, + headers: Option, + body: Option>, + ) -> Result { + let mut retries = 0usize; + let mut delay = Duration::from_secs(1); + loop { + let result = self + .send_request(url, method.clone(), headers.clone(), body.clone()) + .await; + AUTOSCALER_METRICS.calls[&( + url.into(), + match result { + Ok(ref response) => response.status().as_u16(), + Err(ref err) => err + .status() + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + .as_u16(), + }, + )] + .inc(); + match result { + Ok(response) if response.status().is_success() => return Ok(response), + Ok(response) => { + tracing::error!("Received non OK http response {:?}", response.status()) + } + Err(err) => tracing::error!("Error while sending http request {:?}", err), + } + + if retries >= self.max_retries { + return Err(HttpError::RetryExhausted(format!( + "All {} http retires failed", + self.max_retries + ))); + } + retries += 1; + sleep(delay).await; + delay = delay.checked_mul(2).unwrap_or(Duration::MAX); + } + } + + async fn send_request( + &self, + url: &str, + method: Method, + headers: Option, + body: Option>, + ) -> Result { + let mut request = self.client.request(method, url); + + if let Some(headers) = headers { + request = request.headers(headers); + } + + if let Some(body) = body { + request = request.body(body); + } + + let request = request.build()?; + let response = self.client.execute(request).await?; + Ok(response) + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs index b8476ab475ab..4730a0259e4c 100644 --- a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs @@ -13,9 +13,11 @@ use reqwest::{ Method, }; use tokio::sync::Mutex; -use zksync_utils::http_with_retries::send_request_with_retries; -use crate::cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent}; +use crate::{ + cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent}, + http_client::HttpClient, +}; #[derive(Clone)] pub struct Watcher { @@ -23,11 +25,13 @@ pub struct Watcher { pub cluster: Arc>, } -async fn get_cluster_name() -> anyhow::Result { +async fn get_cluster_name(http_client: HttpClient) -> anyhow::Result { let mut headers = HeaderMap::new(); headers.insert("Metadata-Flavor", HeaderValue::from_static("Google")); let url = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name"; - let response = send_request_with_retries(url, 5, Method::GET, Some(headers), None).await; + let response = http_client + .send_request_with_retries(url, Method::GET, Some(headers), None) + .await; response .map_err(|err| anyhow::anyhow!("Failed fetching response from url: {url}: {err:?}"))? .text() @@ -37,6 +41,7 @@ async fn get_cluster_name() -> anyhow::Result { impl Watcher { pub async fn new( + http_client: HttpClient, client: kube::Client, cluster_name: Option, namespaces: Vec, @@ -48,7 +53,7 @@ impl Watcher { let cluster_name = match cluster_name { Some(c) => c, - None => get_cluster_name() + None => get_cluster_name(http_client) .await .expect("Load cluster_name from GCP"), }; diff --git a/prover/crates/bin/prover_autoscaler/src/lib.rs b/prover/crates/bin/prover_autoscaler/src/lib.rs index 019fe2b7fb4d..1861f3af10da 100644 --- a/prover/crates/bin/prover_autoscaler/src/lib.rs +++ b/prover/crates/bin/prover_autoscaler/src/lib.rs @@ -2,6 +2,7 @@ pub mod agent; pub(crate) mod cluster_types; pub mod config; pub mod global; +pub mod http_client; pub mod k8s; pub(crate) mod metrics; pub mod task_wiring; diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index 98ffdb49d824..3baf3d13b2d6 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -10,6 +10,7 @@ use zksync_prover_autoscaler::{ agent, config::{config_from_yaml, ProverAutoscalerConfig}, global::{self}, + http_client::HttpClient, k8s::{Scaler, Watcher}, task_wiring::TaskRunner, }; @@ -74,6 +75,8 @@ async fn main() -> anyhow::Result<()> { let mut tasks = vec![]; + let http_client = HttpClient::default(); + match opt.job { AutoscalerType::Agent => { tracing::info!("Starting ProverAutoscaler Agent"); @@ -84,8 +87,13 @@ async fn main() -> anyhow::Result<()> { let _ = rustls::crypto::ring::default_provider().install_default(); let client = kube::Client::try_default().await?; - let watcher = - Watcher::new(client.clone(), opt.cluster_name, agent_config.namespaces).await; + let watcher = Watcher::new( + http_client, + client.clone(), + opt.cluster_name, + agent_config.namespaces, + ) + .await; let scaler = Scaler::new(client, agent_config.dry_run); tasks.push(tokio::spawn(watcher.clone().run())); tasks.push(tokio::spawn(agent::run_server( @@ -101,9 +109,15 @@ async fn main() -> anyhow::Result<()> { let interval = scaler_config.scaler_run_interval; let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); - let watcher = - global::watcher::Watcher::new(scaler_config.agents.clone(), scaler_config.dry_run); - let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone()); + let watcher = global::watcher::Watcher::new( + http_client.clone(), + scaler_config.agents.clone(), + scaler_config.dry_run, + ); + let queuer = global::queuer::Queuer::new( + http_client, + scaler_config.prover_job_monitor_url.clone(), + ); let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config); tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?); } diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index 115ae3b74259..775f7ec22abd 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -2,8 +2,6 @@ use vise::{Counter, Gauge, LabeledFamily, Metrics}; use crate::config::Gpu; -pub const DEFAULT_ERROR_CODE: u16 = 500; - #[derive(Debug, Metrics)] #[metrics(prefix = "autoscaler")] pub(crate) struct AutoscalerMetrics {